gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-257] Remove jobs' previous run data
Date Fri, 22 Sep 2017 19:55:47 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 5fa983268 -> 6e4a2cecb


[GOBBLIN-257] Remove jobs' previous run data

Closes #2109 from arjun4084346/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6e4a2cec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6e4a2cec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6e4a2cec

Branch: refs/heads/master
Commit: 6e4a2cecbec7bf8a16cc9d5e460c5365fd0bd856
Parents: 5fa9832
Author: Arjun <abora@linkedin.com>
Authored: Fri Sep 22 12:55:36 2017 -0700
Committer: Abhishek Tiwari <abhishektiwari.btech@gmail.com>
Committed: Fri Sep 22 12:55:36 2017 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  2 +
 .../gobblin/runtime/AbstractJobLauncher.java    | 15 +++++-
 .../org/apache/gobblin/runtime/JobContext.java  | 26 ++++++++--
 .../apache/gobblin/runtime/JobContextTest.java  | 50 ++++++++++++++++++++
 .../org/apache/gobblin/util/HadoopUtils.java    | 16 +++++++
 .../apache/gobblin/util/JobLauncherUtils.java   | 25 ++++++++++
 6 files changed, 129 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 303ad15..0ef8416 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -180,6 +180,8 @@ public class ConfigurationKeys {
   public static final String CLEANUP_STAGING_DATA_PER_TASK = "cleanup.staging.data.per.task";
   public static final boolean DEFAULT_CLEANUP_STAGING_DATA_PER_TASK = true;
   public static final String CLEANUP_STAGING_DATA_BY_INITIALIZER = "cleanup.staging.data.by.initializer";
+  public static final String CLEANUP_OLD_JOBS_DATA = "cleanup.old.job.data";
+  public static final boolean DEFAULT_CLEANUP_OLD_JOBS_DATA = false;
 
   public static final String QUEUED_TASK_TIME_MAX_SIZE = "taskexecutor.queued_task_time.history.max_size";
   public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 49fa98e..d59e097 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.runtime;
 
 import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -26,6 +28,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.WriterUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -48,7 +54,6 @@ import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
-import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.CommitSequence;
 import org.apache.gobblin.commit.CommitSequenceStore;
@@ -830,6 +835,9 @@ public abstract class AbstractJobLauncher implements JobLauncher {
           throw new RuntimeException("Work unit streams do not support cleaning staging data
per task.");
         }
       } else {
+        if (jobState.getPropAsBoolean(ConfigurationKeys.CLEANUP_OLD_JOBS_DATA, ConfigurationKeys.DEFAULT_CLEANUP_OLD_JOBS_DATA))
{
+          JobLauncherUtils.cleanUpOldJobData(jobState, LOG, jobContext.getStagingDirProvided(),
jobContext.getOutputDirProvided());
+        }
         JobLauncherUtils.cleanJobStagingData(jobState, LOG);
       }
     } catch (Throwable t) {
@@ -838,6 +846,11 @@ public abstract class AbstractJobLauncher implements JobLauncher {
     }
   }
 
+
+
+  private static String getJobIdPrefix(String jobId) {
+    return jobId.substring(0, jobId.lastIndexOf(Id.Job.SEPARATOR) + 1);
+  }
   /**
    * Cleanup the job's task staging data. This is not doing anything in case job succeeds
    * and data is successfully committed because the staging data has already been moved

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 33c5701..0debcac 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -109,6 +109,12 @@ public class JobContext implements Closeable {
   private final boolean parallelizeCommit;
   private final int parallelCommits;
 
+  // Were WRITER_STAGING_DIR and WRITER_OUTPUT_DIR provided in the job file
+  @Getter
+  protected final Boolean stagingDirProvided;
+  @Getter
+  protected final Boolean outputDirProvided;
+
   @Getter
   private final DeliverySemantics semantics;
 
@@ -147,6 +153,9 @@ public class JobContext implements Closeable {
         new JobState(jobPropsState, this.datasetStateStore.getLatestDatasetStatesByUrns(this.jobName),
this.jobName,
             this.jobId);
 
+    stagingDirProvided = this.jobState.contains(ConfigurationKeys.WRITER_STAGING_DIR);
+    outputDirProvided = this.jobState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR);
+
     setTaskStagingAndOutputDirs();
 
     if (GobblinMetrics.isEnabled(jobProps)) {
@@ -287,26 +296,35 @@ public class JobContext implements Closeable {
     return this.source;
   }
 
-  protected void setTaskStagingAndOutputDirs() {
+  /**
+   * Appends two paths
+   * @param dir1
+   * @param dir2
+   * @return appended path
+   */
+  protected static Path getJobDir(String dir1, String dir2) {
+    return new Path(dir1, dir2);
+  }
 
+  protected void setTaskStagingAndOutputDirs() {
     // Add jobId to writer staging dir
     if (this.jobState.contains(ConfigurationKeys.WRITER_STAGING_DIR)) {
       String writerStagingDirWithJobId =
-          new Path(this.jobState.getProp(ConfigurationKeys.WRITER_STAGING_DIR), this.jobId).toString();
+          new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_STAGING_DIR),
this.getJobName()), this.jobId).toString();
       this.jobState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, writerStagingDirWithJobId);
     }
 
     // Add jobId to writer output dir
     if (this.jobState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
       String writerOutputDirWithJobId =
-          new Path(this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), this.jobId).toString();
+          new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
this.getJobName()), this.jobId).toString();
       this.jobState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, writerOutputDirWithJobId);
     }
 
     // Add jobId to task data root dir
     if (this.jobState.contains(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY)) {
       String taskDataRootDirWithJobId =
-          new Path(this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY), this.jobId).toString();
+          new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY),
this.getJobName()), this.jobId).toString();
       this.jobState.setProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY, taskDataRootDirWithJobId);
 
       setTaskStagingDir();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
index d3dba5b..a21a0d8 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
@@ -29,18 +29,27 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import org.slf4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
+import com.google.common.io.Files;
 
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.Id;
 
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
@@ -214,6 +223,47 @@ public class JobContextTest {
     Assert.assertEquals(jobContext.getJobState().getState(), JobState.RunningState.FAILED);
   }
 
+  @Test
+  public void testCleanUpOldJobData() throws Exception {
+    String rootPath = Files.createTempDir().getAbsolutePath();
+    final String JOB_PREFIX = Id.Job.PREFIX;
+    final String JOB_NAME1 = "GobblinKafka";
+    final String JOB_NAME2 = "GobblinBrooklin";
+    final long timestamp1 = 1505774129247L;
+    final long timestamp2 = 1505774129248L;
+    final Joiner JOINER = Joiner.on(Id.SEPARATOR).skipNulls();
+    Object[] oldJob1 = new Object[]{JOB_PREFIX, JOB_NAME1, timestamp1};
+    Object[] oldJob2 = new Object[]{JOB_PREFIX, JOB_NAME2, timestamp1};
+    Object[] currentJob = new Object[]{JOB_PREFIX, JOB_NAME1, timestamp2};
+
+    Path currentJobPath = new Path(JobContext.getJobDir(rootPath, JOB_NAME1), JOINER.join(currentJob));
+    Path oldJobPath1 = new Path(JobContext.getJobDir(rootPath, JOB_NAME1), JOINER.join(oldJob1));
+    Path oldJobPath2 = new Path(JobContext.getJobDir(rootPath, JOB_NAME2), JOINER.join(oldJob2));
+    Path stagingPath = new Path(currentJobPath, "task-staging");
+    Path outputPath = new Path(currentJobPath, "task-output");
+
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    fs.mkdirs(currentJobPath);
+    fs.mkdirs(oldJobPath1);
+    fs.mkdirs(oldJobPath2);
+    log.info("Created : {} {} {}", currentJobPath, oldJobPath1, oldJobPath2);
+
+    gobblin.runtime.JobState jobState = new gobblin.runtime.JobState();
+    jobState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingPath.toString());
+    jobState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputPath.toString());
+
+    JobContext jobContext = mock(JobContext.class);
+    when(jobContext.getStagingDirProvided()).thenReturn(false);
+    when(jobContext.getOutputDirProvided()).thenReturn(false);
+    when(jobContext.getJobId()).thenReturn(currentJobPath.getName().toString());
+
+    JobLauncherUtils.cleanUpOldJobData(jobState, log, jobContext.getStagingDirProvided(),
jobContext.getOutputDirProvided());
+
+    Assert.assertFalse(fs.exists(oldJobPath1));
+    Assert.assertTrue(fs.exists(oldJobPath2));
+    Assert.assertFalse(fs.exists(currentJobPath));
+  }
+
   /**
    * A {@link Callable} that blocks until a different thread calls {@link #unblock()}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
index 27ec5cd..4b01ab4 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -185,6 +186,21 @@ public class HadoopUtils {
   }
 
   /**
+   * Delete files according to the regular expression provided
+   * @param fs Filesystem object
+   * @param path base path
+   * @param regex regular expression to select files to delete
+   * @throws IOException
+   */
+  public static void deletePathByRegex(FileSystem fs, final Path path, final String regex)
throws IOException {
+    FileStatus[] statusList = fs.listStatus(path, path1 -> path1.getName().matches(regex));
+
+    for (final FileStatus oldJobFile : statusList) {
+      HadoopUtils.deletePath(fs, oldJobFile.getPath(), true);
+    }
+  }
+
+  /**
    * Moves the object to the filesystem trash according to the file system policy.
    * @param fs FileSystem object
    * @param path Path to the object to be moved to trash.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6e4a2cec/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index df4d73a..45eb82d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -20,8 +20,10 @@ package org.apache.gobblin.util;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
@@ -236,6 +238,29 @@ public class JobLauncherUtils {
     }
   }
 
+  public static void cleanUpOldJobData(State state, Logger logger, boolean stagingDirProvided,
boolean outputDirProvided) throws IOException {
+    Set<Path> jobPaths = new HashSet<>();
+    String writerFsUri = state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, ConfigurationKeys.LOCAL_FS_URI);
+    FileSystem fs = FileSystem.get(URI.create(writerFsUri), WriterUtils.getFsConfiguration(state));
+
+    Path jobPath;
+    if (stagingDirProvided) {
+      jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR)).getParent();
+    } else {
+      jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR)).getParent().getParent();
+    }
+    jobPaths.add(jobPath);
+    if (outputDirProvided) {
+      jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)).getParent();
+    } else {
+      jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)).getParent().getParent();
+    }
+    jobPaths.add(jobPath);
+    for (Path jobPathToDelete : jobPaths) {
+      logger.info("Cleaning up old job directory " + jobPathToDelete);
+      HadoopUtils.deletePath(fs, jobPathToDelete, true);
+    }
+  }
   /**
    * @param state
    * @param fsUri


Mime
View raw message