gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-244] Need additional info for gobblin tracking hourly-deduped
Date Mon, 11 Sep 2017 17:13:37 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master afdba01f9 -> 2485282d2


[GOBBLIN-244] Need additional info for gobblin tracking hourly-deduped

Closes #2094 from yukuai518/compaction


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2485282d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2485282d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2485282d

Branch: refs/heads/master
Commit: 2485282d2b1331b42da7dcd9ab6f1f50306fe3ce
Parents: afdba01
Author: Kuai Yu <kuyu@linkedin.com>
Authored: Mon Sep 11 10:13:31 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Mon Sep 11 10:13:31 2017 -0700

----------------------------------------------------------------------
 .../CompactionCompleteFileOperationAction.java  | 18 ++--
 .../event/CompactionSlaEventHelper.java         |  2 +
 .../verify/CompactionThresholdVerifier.java     |  2 +-
 .../verify/InputRecordCountHelper.java          | 97 +++++++++++++++++---
 4 files changed, 99 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2485282d/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 713fd32..ceddb0d 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -87,8 +87,8 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
       // We are not getting record count from map-reduce counter because in next run, the
threshold (delta record)
       // calculation is based on the input file names.
       long newTotalRecords = 0;
-      long oldTotalRecords = InputRecordCountHelper.readRecordCount (helper.getFs(), new
Path (result.getDstAbsoluteDir()));
-
+      long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir()));
+      long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir()));
       if (appendDeltaOutput) {
         FsPermission permission = HadoopUtils.deserializeFsPermission(this.state,
                 MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
@@ -126,15 +126,21 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
         newTotalRecords = counter.getValue();
       }
 
+      State compactState = helper.loadState(new Path (result.getDstAbsoluteDir()));
+      compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
+      compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount
+ 1));
+      helper.saveState(new Path (result.getDstAbsoluteDir()), compactState);
+
+      log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords,
dstPath, executeCount + 1);
+
       // submit events for record count
       if (eventSubmitter != null) {
         Map<String, String> eventMetadataMap = ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN,
dataset.datasetURN(),
-            CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
+            CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords),
+            CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
+            CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
         this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT,
eventMetadataMap);
       }
-
-      InputRecordCountHelper.writeRecordCount (helper.getFs(), new Path (result.getDstAbsoluteDir()),
newTotalRecords);
-      log.info("Updating record count from {} to {} in {} ", oldTotalRecords, newTotalRecords,
dstPath);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2485282d/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
index b0c4dcb..5d89fd8 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
@@ -51,6 +51,8 @@ public class CompactionSlaEventHelper {
   public static final String LATE_RECORD_COUNT = "lateRecordCount";
   public static final String REGULAR_RECORD_COUNT = "regularRecordCount";
   public static final String NEED_RECOMPACT = "needRecompact";
+  public static final String PREV_RECORD_COUNT_TOTAL = "prevRecordCountTotal";
+  public static final String EXEC_COUNT_TOTAL = "executionCountTotal";
   public static final String RECORD_COUNT_TOTAL = "recordCountTotal";
   public static final String HIVE_REGISTRATION_PATHS = "hiveRegistrationPaths";
   public static final String RENAME_DIR_PATHS = "renameDirPaths";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2485282d/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index 27bc6f0..fbd6413 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -73,7 +73,7 @@ public class CompactionThresholdVerifier implements CompactionVerifier<FileSyste
     InputRecordCountHelper helper = new InputRecordCountHelper(state);
     try {
       double newRecords = helper.calculateRecordCount (Lists.newArrayList(new Path(dataset.datasetURN())));
-      double oldRecords = InputRecordCountHelper.readRecordCount (helper.getFs(), new Path(result.getDstAbsoluteDir()));
+      double oldRecords = helper.readRecordCount (new Path(result.getDstAbsoluteDir()));
 
       log.info ("Dataset {} : previous records {}, current records {}", dataset.datasetURN(),
oldRecords, newRecords);
       if (oldRecords == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2485282d/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
index 661fa47..de95255 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.compaction.verify;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import org.apache.gobblin.compaction.dataset.DatasetHelper;
+import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -28,11 +29,12 @@ import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import java.io.BufferedReader;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.URI;
@@ -54,8 +56,11 @@ public class InputRecordCountHelper {
   private final RecordCountProvider inputRecordCountProvider;
   private final String AVRO = "avro";
 
+  @Deprecated
   public final static String RECORD_COUNT_FILE = "_record_count";
 
+  public final static String STATE_FILE = "_state_file";
+
   /**
    * Constructor
    */
@@ -86,33 +91,99 @@ public class InputRecordCountHelper {
   }
 
   /**
+   * Load compaction state file
+   */
+  public State loadState (Path dir) throws IOException {
+    return loadState(this.fs, dir);
+  }
+
+  private static State loadState (FileSystem fs, Path dir) throws IOException {
+    State state = new State();
+    if (fs.exists(new Path(dir, STATE_FILE))) {
+      try (FSDataInputStream inputStream = fs.open(new Path(dir, STATE_FILE))) {
+        state.readFields(inputStream);
+      }
+    }
+    return state;
+  }
+
+  /**
+   * Save compaction state file
+   */
+  public void saveState (Path dir, State state) throws IOException {
+    saveState(this.fs, dir, state);
+  }
+
+  private static void saveState (FileSystem fs, Path dir, State state) throws IOException
{
+    Path tmpFile = new Path(dir, STATE_FILE + ".tmp");
+    Path newFile = new Path(dir, STATE_FILE);
+    fs.delete(tmpFile, false);
+    try (DataOutputStream dataOutputStream = new DataOutputStream(fs.create(new Path(dir,
STATE_FILE + ".tmp")))) {
+      state.write(dataOutputStream);
+    }
+
+    // Caution: We are deleting right before renaming because rename doesn't support atomic
overwrite options from FileSystem API.
+    fs.delete(newFile, false);
+    fs.rename(tmpFile, newFile);
+  }
+
+  /**
    * Read record count from a specific directory.
-   * File name is {@link InputRecordCountHelper#RECORD_COUNT_FILE}
+   * File name is {@link InputRecordCountHelper#STATE_FILE}
+   * @param dir directory where a state file is located
+   * @return record count
+   */
+  public long readRecordCount (Path dir) throws IOException {
+    return readRecordCount(this.fs, dir);
+  }
+
+  /**
+   * Read record count from a specific directory.
+   * File name is {@link InputRecordCountHelper#STATE_FILE}
    * @param fs  file system in use
-   * @param dir directory where a record file will be read
+   * @param dir directory where a state file is located
    * @return record count
    */
+  @Deprecated
   public static long readRecordCount (FileSystem fs, Path dir) throws IOException {
-    if (!fs.exists(new Path(dir, RECORD_COUNT_FILE))) {
-      return 0;
-    }
+    State state = loadState(fs, dir);
 
-    try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open (new Path (dir,
RECORD_COUNT_FILE)), Charsets.UTF_8))) {
-      long count = Long.parseLong(br.readLine());
-      return count;
+    if (!state.contains(CompactionSlaEventHelper.RECORD_COUNT_TOTAL)) {
+      if (fs.exists(new Path (dir, RECORD_COUNT_FILE))){
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open (new Path
(dir, RECORD_COUNT_FILE)), Charsets.UTF_8))) {
+          long count = Long.parseLong(br.readLine());
+          return count;
+        }
+      } else {
+        return 0;
+      }
+    } else {
+      return Long.parseLong(state.getProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL));
     }
   }
 
   /**
+   * Read execution count from a specific directory.
+   * File name is {@link InputRecordCountHelper#STATE_FILE}
+   * @param dir directory where a state file is located
+   * @return record count
+   */
+  public long readExecutionCount (Path dir) throws IOException {
+    State state = loadState(fs, dir);
+    return Long.parseLong(state.getProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, "0"));
+  }
+
+  /**
    * Write record count to a specific directory.
    * File name is {@link InputRecordCountHelper#RECORD_COUNT_FILE}
    * @param fs file system in use
-   * @param dir directory where a record file will be saved
+   * @param dir directory where a record file is located
    */
+  @Deprecated
   public static void writeRecordCount (FileSystem fs, Path dir, long count) throws IOException
{
-    try (FSDataOutputStream outputFileStream = fs.create(new Path(dir, RECORD_COUNT_FILE)))
{
-      outputFileStream.writeBytes(Long.toString(count));
-    }
+     State state = loadState(fs, dir);
+     state.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, count);
+     saveState(fs, dir, state);
   }
 
   protected FileSystem getSourceFileSystem (State state)


Mime
View raw message