gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-309] Disabled rewrite and enabled retry for adding jar file on HDFS
Date Mon, 13 Nov 2017 20:21:07 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master e3f9de1a4 -> a34a81a42


[GOBBLIN-309] Disabled rewrite and enabled retry for adding jar file on HDFS

Closes #2163 from autumnust/jarcopyfix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a34a81a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a34a81a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a34a81a4

Branch: refs/heads/master
Commit: a34a81a42a73705558e32f38af54835fcea47325
Parents: e3f9de1
Author: Lei Sun <autumnust@gmail.com>
Authored: Mon Nov 13 12:21:01 2017 -0800
Committer: Hung Tran <hutran@linkedin.com>
Committed: Mon Nov 13 12:21:01 2017 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   2 +
 .../runtime/mapreduce/MRJobLauncher.java        | 142 ++++++++++++-------
 2 files changed, 91 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a34a81a4/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c8de615..a563b43 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -182,6 +182,8 @@ public class ConfigurationKeys {
   public static final String CLEANUP_STAGING_DATA_BY_INITIALIZER = "cleanup.staging.data.by.initializer";
   public static final String CLEANUP_OLD_JOBS_DATA = "cleanup.old.job.data";
   public static final boolean DEFAULT_CLEANUP_OLD_JOBS_DATA = false;
+  public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY = JOB_JAR_FILES_KEY + ".uploading.retry.maximum";
+
 
   public static final String QUEUED_TASK_TIME_MAX_SIZE = "taskexecutor.queued_task_time.history.max_size";
   public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a34a81a4/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index dcb6a14..9f17db1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -21,8 +21,10 @@ import java.io.DataOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -56,9 +58,9 @@ import com.google.common.io.Closer;
 import com.google.common.util.concurrent.ServiceManager;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
-import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -114,6 +116,11 @@ public class MRJobLauncher extends AbstractJobLauncher {
   private static final String OUTPUT_DIR_NAME = "output";
   private static final String WORK_UNIT_LIST_FILE_EXTENSION = ".wulist";
 
+  // Configuration that make uploading of jar files more reliable,
+  // since multiple Gobblin Jobs are sharing the same jar directory.
+  private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
+  private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;
+
   private static final Splitter SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
 
   private final Configuration conf;
@@ -134,13 +141,13 @@ public class MRJobLauncher extends AbstractJobLauncher {
 
   private final StateStore<TaskState> taskStateStore;
 
-  public MRJobLauncher(Properties jobProps)
-      throws Exception {
+  private final int jarFileMaximumRetry;
+
+  public MRJobLauncher(Properties jobProps) throws Exception {
     this(jobProps, null);
   }
 
-  public MRJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes>
instanceBroker)
-      throws Exception {
+  public MRJobLauncher(Properties jobProps, SharedResourcesBroker<GobblinScopeTypes>
instanceBroker) throws Exception {
     this(jobProps, new Configuration(), instanceBroker);
   }
 
@@ -193,12 +200,16 @@ public class MRJobLauncher extends AbstractJobLauncher {
         new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus,
taskStateStore,
             outputTaskStateDir);
 
+    this.jarFileMaximumRetry =
+        jobProps.containsKey(ConfigurationKeys.MAXIMUM_JAR_COPY_RETRY_TIMES_KEY) ? Integer.parseInt(
+            jobProps.getProperty(ConfigurationKeys.MAXIMUM_JAR_COPY_RETRY_TIMES_KEY))
+            : MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT;
+
     startCancellationExecutor();
   }
 
   @Override
-  public void close()
-      throws IOException {
+  public void close() throws IOException {
     try {
       if (this.hadoopJobSubmitted && !this.job.isComplete()) {
         LOG.info("Killing the Hadoop MR job for job " + this.jobContext.getJobId());
@@ -215,8 +226,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
   }
 
   @Override
-  protected void runWorkUnits(List<WorkUnit> workUnits)
-      throws Exception {
+  protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
     String jobName = this.jobContext.getJobName();
     JobState jobState = this.jobContext.getJobState();
 
@@ -278,8 +288,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
   /**
    * Add dependent jars and files.
    */
-  private void addDependencies(Configuration conf)
-      throws IOException {
+  private void addDependencies(Configuration conf) throws IOException {
     TimingEvent distributedCacheSetupTimer =
         this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_DISTRIBUTED_CACHE_SETUP);
 
@@ -317,8 +326,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
   /**
    * Prepare the Hadoop MR job, including configuring the job and setting up the input/output
paths.
    */
-  private void prepareHadoopJob(List<WorkUnit> workUnits)
-      throws IOException {
+  private void prepareHadoopJob(List<WorkUnit> workUnits) throws IOException {
     TimingEvent mrJobSetupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_SETUP);
 
     // Add dependent jars/files
@@ -386,34 +394,66 @@ public class MRJobLauncher extends AbstractJobLauncher {
    * so the mappers can use them.
    */
   @SuppressWarnings("deprecation")
-  private void addJars(Path jarFileDir, String jarFileList, Configuration conf)
-      throws IOException {
+  private void addJars(Path jarFileDir, String jarFileList, Configuration conf) throws IOException
{
     LocalFileSystem lfs = FileSystem.getLocal(conf);
     for (String jarFile : SPLITTER.split(jarFileList)) {
       Path srcJarFile = new Path(jarFile);
       FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
+
       for (FileStatus status : fileStatusList) {
-        // SNAPSHOT jars should not be shared, as different jobs may be using different versions
of it
-        Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? this.unsharedJarsDir
: jarFileDir;
-        // DistributedCache requires absolute path, so we need to use makeQualified.
-        Path destJarFile = new Path(this.fs.makeQualified(baseDir), status.getPath().getName());
-        if (!this.fs.exists(destJarFile)) {
-          // Copy the jar file from local file system to HDFS
-          this.fs.copyFromLocalFile(status.getPath(), destJarFile);
+        // For each FileStatus there are chances it could fail in copying at the first attempt,
due to file-existence
+        // or file-copy is ongoing by other job instance since all Gobblin jobs share the
same jar file directory.
+        // the retryCount is to avoid cases (if any) where retry is going too far and causes
job hanging.
+        int retryCount = 0;
+        boolean shouldFileBeAddedIntoDC = true;
+        Path destJarFile = calculateDestJarFile(status, jarFileDir);
+        // Adding destJarFile into HDFS until it exists and the size of file on targetPath
matches the one on local path.
+        while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() !=
status.getLen()) {
+          try {
+            if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen()
!= status.getLen()) {
+              Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
+              throw new IOException("Waiting for file to complete on uploading ... ");
+            }
+            // Set the first parameter as false for not deleting sourceFile
+            // Set the second parameter as false for not overwriting existing file on the
target, by default it is true.
+            // If the file is preExisted but overwrite flag set to false, then an IOException
if thrown.
+            this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile);
+          } catch (IOException | InterruptedException e) {
+            LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry.");
+            retryCount += 1;
+            if (retryCount >= this.jarFileMaximumRetry) {
+              LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs",
e);
+              // If retry reaches upper limit, skip copying this file.
+              shouldFileBeAddedIntoDC = false;
+              break;
+            }
+          }
+        }
+        if (shouldFileBeAddedIntoDC) {
+          // Then add the jar file on HDFS to the classpath
+          LOG.info(String.format("Adding %s to classpath", destJarFile));
+          DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
         }
-        // Then add the jar file on HDFS to the classpath
-        LOG.info(String.format("Adding %s to classpath", destJarFile));
-        DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
       }
     }
   }
 
   /**
+   * Calculate the target filePath of the jar file to be copied on HDFS,
+   * given the {@link FileStatus} of a jarFile and the path of directory that contains jar.
+   */
+  private Path calculateDestJarFile(FileStatus status, Path jarFileDir) {
+    // SNAPSHOT jars should not be shared, as different jobs may be using different versions
of it
+    Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? this.unsharedJarsDir
: jarFileDir;
+    // DistributedCache requires absolute path, so we need to use makeQualified.
+    return new Path(this.fs.makeQualified(baseDir), status.getPath().getName());
+  }
+
+  /**
    * Add local non-jar files the job depends on to DistributedCache.
    */
   @SuppressWarnings("deprecation")
-  private void addLocalFiles(Path jobFileDir, String jobFileList, Configuration conf)
-      throws IOException {
+  private void addLocalFiles(Path jobFileDir, String jobFileList, Configuration conf) throws
IOException {
     DistributedCache.createSymlink(conf);
     for (String jobFile : SPLITTER.split(jobFileList)) {
       Path srcJobFile = new Path(jobFile);
@@ -446,8 +486,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     }
   }
 
-  private void addHdfsJars(String hdfsJarFileList, Configuration conf)
-      throws IOException {
+  private void addHdfsJars(String hdfsJarFileList, Configuration conf) throws IOException
{
     for (String jarFile : SPLITTER.split(hdfsJarFileList)) {
       FileStatus[] status = this.fs.listStatus(new Path(jarFile));
       for (FileStatus fileStatus : status) {
@@ -464,8 +503,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
    * Prepare the job input.
    * @throws IOException
    */
-  private void prepareJobInput(List<WorkUnit> workUnits)
-      throws IOException {
+  private void prepareJobInput(List<WorkUnit> workUnits) throws IOException {
     Closer closer = Closer.create();
     try {
       ParallelRunner parallelRunner = closer.register(new ParallelRunner(this.parallelRunnerThreads,
this.fs));
@@ -513,8 +551,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
    * Create a {@link org.apache.gobblin.metrics.GobblinMetrics} instance for this job run
from the Hadoop counters.
    */
   @VisibleForTesting
-  void countersToMetrics(GobblinMetrics metrics)
-      throws IOException {
+  void countersToMetrics(GobblinMetrics metrics) throws IOException {
     Optional<Counters> counters = Optional.fromNullable(this.job.getCounters());
 
     if (counters.isPresent()) {
@@ -532,8 +569,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     }
   }
 
-  private static FileSystem buildFileSystem(Properties jobProps, Configuration configuration)
-      throws IOException {
+  private static FileSystem buildFileSystem(Properties jobProps, Configuration configuration)
throws IOException {
     URI fsUri = URI.create(jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI));
     return FileSystem.newInstance(fsUri, configuration);
   }
@@ -577,9 +613,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
         boolean foundStateFile = false;
         for (Path dcPath : DistributedCache.getLocalCacheFiles(context.getConfiguration()))
{
           if (dcPath.getName().equals(jobStateFileName)) {
-            SerializationUtils
-                .deserializeStateFromInputStream(closer.register(new FileInputStream(dcPath.toUri().getPath())),
-                    this.jobState);
+            SerializationUtils.deserializeStateFromInputStream(
+                closer.register(new FileInputStream(dcPath.toUri().getPath())), this.jobState);
             foundStateFile = true;
             break;
           }
@@ -607,14 +642,14 @@ public class MRJobLauncher extends AbstractJobLauncher {
       if (Boolean.valueOf(
           configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED)))
{
         this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
-        this.jobMetrics.get().startMetricReportingWithFileSuffix(HadoopUtils.getStateFromConf(configuration),
-            context.getTaskAttemptID().toString());
+        this.jobMetrics.get()
+            .startMetricReportingWithFileSuffix(HadoopUtils.getStateFromConf(configuration),
+                context.getTaskAttemptID().toString());
       }
     }
 
     @Override
-    public void run(Context context)
-        throws IOException, InterruptedException {
+    public void run(Context context) throws IOException, InterruptedException {
       this.setup(context);
       GobblinMultiTaskAttempt gobblinMultiTaskAttempt = null;
       try {
@@ -626,15 +661,19 @@ public class MRJobLauncher extends AbstractJobLauncher {
             isSpeculativeEnabled ? GobblinMultiTaskAttempt.CommitPolicy.CUSTOMIZED
                 : GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE;
 
-        SharedResourcesBroker<GobblinScopeTypes> globalBroker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
-            ConfigFactory.parseProperties(this.jobState.getProperties()), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+        SharedResourcesBroker<GobblinScopeTypes> globalBroker =
+            SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+                ConfigFactory.parseProperties(this.jobState.getProperties()),
+                GobblinScopeTypes.GLOBAL.defaultScopeInstance());
         SharedResourcesBroker<GobblinScopeTypes> jobBroker =
-            globalBroker.newSubscopedBuilder(new JobScopeInstance(this.jobState.getJobName(),
this.jobState.getJobId())).build();
+            globalBroker.newSubscopedBuilder(new JobScopeInstance(this.jobState.getJobName(),
this.jobState.getJobId()))
+                .build();
 
         // Actually run the list of WorkUnits
         gobblinMultiTaskAttempt =
-            GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), context.getTaskAttemptID().toString(),
this.jobState, this.workUnits,
-                this.taskStateTracker, this.taskExecutor, this.taskStateStore, multiTaskAttemptCommitPolicy,
jobBroker);
+            GobblinMultiTaskAttempt.runWorkUnits(this.jobState.getJobId(), context.getTaskAttemptID().toString(),
+                this.jobState, this.workUnits, this.taskStateTracker, this.taskExecutor,
this.taskStateStore,
+                multiTaskAttemptCommitPolicy, jobBroker);
 
         if (this.isSpeculativeEnabled) {
           LOG.info("will not commit in task attempt");
@@ -646,14 +685,12 @@ public class MRJobLauncher extends AbstractJobLauncher {
         CommitStep cleanUpCommitStep = new CommitStep() {
 
           @Override
-          public boolean isCompleted()
-              throws IOException {
+          public boolean isCompleted() throws IOException {
             return !serviceManager.isHealthy();
           }
 
           @Override
-          public void execute()
-              throws IOException {
+          public void execute() throws IOException {
             LOG.info("Starting the clean-up steps.");
             try {
               serviceManager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
@@ -682,8 +719,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     }
 
     @Override
-    public void map(LongWritable key, Text value, Context context)
-        throws IOException, InterruptedException {
+    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
       WorkUnit workUnit = (value.toString().endsWith(MULTI_WORK_UNIT_FILE_EXTENSION) ? MultiWorkUnit.createEmpty()
           : WorkUnit.createEmpty());
       SerializationUtils.deserializeState(this.fs, new Path(value.toString()), workUnit);


Mime
View raw message