hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vin...@apache.org
Subject [incubator-hudi] branch master updated: [MINOR] Optimize hudi-cli module (#1136)
Date Sat, 04 Jan 2020 17:05:57 GMT
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a733f4e  [MINOR] Optimize hudi-cli module (#1136)
a733f4e is described below

commit a733f4ef723865738d8541282c0c7234d64668db
Author: SteNicholas <programgeek@163.com>
AuthorDate: Sun Jan 5 01:05:50 2020 +0800

    [MINOR] Optimize hudi-cli module (#1136)
---
 .../org/apache/hudi/cli/HoodiePrintHelper.java     |  2 +-
 .../src/main/java/org/apache/hudi/cli/Table.java   | 36 ++++++---------
 .../hudi/cli/commands/ArchivedCommitsCommand.java  | 53 +++++++++-------------
 .../apache/hudi/cli/commands/CleansCommand.java    | 13 +++---
 .../apache/hudi/cli/commands/CommitsCommand.java   | 18 +++-----
 .../hudi/cli/commands/CompactionCommand.java       | 31 ++++++-------
 .../apache/hudi/cli/commands/DatasetsCommand.java  | 12 ++---
 .../hudi/cli/commands/FileSystemViewCommand.java   | 17 +++----
 .../cli/commands/HDFSParquetImportCommand.java     |  4 +-
 .../hudi/cli/commands/HoodieLogFileCommand.java    | 29 ++++++------
 .../hudi/cli/commands/HoodieSyncCommand.java       | 45 ++++++++----------
 .../apache/hudi/cli/commands/RepairsCommand.java   |  2 +-
 .../apache/hudi/cli/commands/RollbacksCommand.java | 26 +++++------
 .../hudi/cli/commands/SavepointsCommand.java       |  4 +-
 .../org/apache/hudi/cli/commands/SparkMain.java    |  6 +--
 .../org/apache/hudi/cli/commands/StatsCommand.java |  8 ++--
 .../java/org/apache/hudi/cli/utils/SparkUtil.java  | 12 ++---
 17 files changed, 142 insertions(+), 176 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
index 5325432..53114ce 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
@@ -84,7 +84,7 @@ public class HoodiePrintHelper {
     buffer.getFieldNames().toArray(header);
 
     String[][] rows =
-        buffer.getRenderRows().stream().map(l -> l.stream().toArray(String[]::new)).toArray(String[][]::new);
+        buffer.getRenderRows().stream().map(l -> l.toArray(new String[l.size()])).toArray(String[][]::new);
     return printTextTable(header, rows);
   }
 
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java b/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
index bebc7fc..8158eef 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
@@ -22,7 +22,6 @@ import org.apache.hudi.common.util.Option;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -89,7 +88,7 @@ public class Table implements Iterable<List<String>> {
    * @return
    */
   public Table addAll(List<List<Comparable>> rows) {
-    rows.forEach(r -> add(r));
+    rows.forEach(this::add);
     return this;
   }
 
@@ -120,16 +119,11 @@ public class Table implements Iterable<List<String>> {
    */
   private List<List<Comparable>> orderRows() {
     return orderingFieldNameOptional.map(orderingColumnName -> {
-      return rawRows.stream().sorted(new Comparator<List<Comparable>>() {
-        @Override
-        public int compare(List<Comparable> row1, List<Comparable> row2) {
-          Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName));
-          Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName));
-          int cmpRawResult = fieldForRow1.compareTo(fieldForRow2);
-          return isDescendingOptional.map(isDescending -> {
-            return isDescending ? -1 * cmpRawResult : cmpRawResult;
-          }).orElse(cmpRawResult);
-        }
+      return rawRows.stream().sorted((row1, row2) -> {
+        Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName));
+        Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName));
+        int cmpRawResult = fieldForRow1.compareTo(fieldForRow2);
+        return isDescendingOptional.map(isDescending -> isDescending ? -1 * cmpRawResult : cmpRawResult).orElse(cmpRawResult);
       }).collect(Collectors.toList());
     }).orElse(rawRows);
   }
@@ -141,16 +135,14 @@ public class Table implements Iterable<List<String>> {
     this.renderRows = new ArrayList<>();
     final int limit = this.limitOptional.orElse(rawRows.size());
     final List<List<Comparable>> orderedRows = orderRows();
-    renderRows = orderedRows.stream().limit(limit).map(row -> {
-      return IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
-        String fieldName = rowHeader.get(idx);
-        if (fieldNameToConverterMap.containsKey(fieldName)) {
-          return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
-        }
-        Object v = row.get(idx);
-        return v == null ? "null" : v.toString();
-      }).collect(Collectors.toList());
-    }).collect(Collectors.toList());
+    renderRows = orderedRows.stream().limit(limit).map(row -> IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
+      String fieldName = rowHeader.get(idx);
+      if (fieldNameToConverterMap.containsKey(fieldName)) {
+        return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
+      }
+      Object v = row.get(idx);
+      return v == null ? "null" : v.toString();
+    }).collect(Collectors.toList())).collect(Collectors.toList());
   }
 
   @Override
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index c8f1dc8..f455504 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -89,29 +89,27 @@ public class ArchivedCommitsCommand implements CommandMarker {
                 .deepCopy(HoodieCommitMetadata.SCHEMA$, r.get("hoodieCommitMetadata"));
             final String instantTime = r.get("commitTime").toString();
             final String action = r.get("actionType").toString();
-            return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> {
-              return hoodieWriteStats.stream().map(hoodieWriteStat -> {
-                List<Comparable> row = new ArrayList<>();
-                row.add(action);
-                row.add(instantTime);
-                row.add(hoodieWriteStat.getPartitionPath());
-                row.add(hoodieWriteStat.getFileId());
-                row.add(hoodieWriteStat.getPrevCommit());
-                row.add(hoodieWriteStat.getNumWrites());
-                row.add(hoodieWriteStat.getNumInserts());
-                row.add(hoodieWriteStat.getNumDeletes());
-                row.add(hoodieWriteStat.getNumUpdateWrites());
-                row.add(hoodieWriteStat.getTotalLogFiles());
-                row.add(hoodieWriteStat.getTotalLogBlocks());
-                row.add(hoodieWriteStat.getTotalCorruptLogBlock());
-                row.add(hoodieWriteStat.getTotalRollbackBlocks());
-                row.add(hoodieWriteStat.getTotalLogRecords());
-                row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted());
-                row.add(hoodieWriteStat.getTotalWriteBytes());
-                row.add(hoodieWriteStat.getTotalWriteErrors());
-                return row;
-              });
-            }).map(rowList -> rowList.toArray(new Comparable[0]));
+            return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> hoodieWriteStats.stream().map(hoodieWriteStat -> {
+              List<Comparable> row = new ArrayList<>();
+              row.add(action);
+              row.add(instantTime);
+              row.add(hoodieWriteStat.getPartitionPath());
+              row.add(hoodieWriteStat.getFileId());
+              row.add(hoodieWriteStat.getPrevCommit());
+              row.add(hoodieWriteStat.getNumWrites());
+              row.add(hoodieWriteStat.getNumInserts());
+              row.add(hoodieWriteStat.getNumDeletes());
+              row.add(hoodieWriteStat.getNumUpdateWrites());
+              row.add(hoodieWriteStat.getTotalLogFiles());
+              row.add(hoodieWriteStat.getTotalLogBlocks());
+              row.add(hoodieWriteStat.getTotalCorruptLogBlock());
+              row.add(hoodieWriteStat.getTotalRollbackBlocks());
+              row.add(hoodieWriteStat.getTotalLogRecords());
+              row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted());
+              row.add(hoodieWriteStat.getTotalWriteBytes());
+              row.add(hoodieWriteStat.getTotalWriteErrors());
+              return row;
+            })).map(rowList -> rowList.toArray(new Comparable[0]));
           }).collect(Collectors.toList());
       allStats.addAll(readCommits);
       reader.close();
@@ -183,14 +181,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
           }
           break;
         }
-        case HoodieTimeline.COMMIT_ACTION: {
-          commitDetails.add(record.get("commitTime"));
-          commitDetails.add(record.get("actionType").toString());
-          if (!skipMetadata) {
-            commitDetails.add(record.get("hoodieCommitMetadata").toString());
-          }
-          break;
-        }
+        case HoodieTimeline.COMMIT_ACTION:
         case HoodieTimeline.DELTA_COMMIT_ACTION: {
           commitDetails.add(record.get("commitTime"));
           commitDetails.add(record.get("actionType").toString());
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
index 857fb0d..7c30498 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
@@ -66,12 +66,11 @@ public class CleansCommand implements CommandMarker {
     HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
     List<HoodieInstant> cleans = timeline.getReverseOrderedInstants().collect(Collectors.toList());
     List<Comparable[]> rows = new ArrayList<>();
-    for (int i = 0; i < cleans.size(); i++) {
-      HoodieInstant clean = cleans.get(i);
+    for (HoodieInstant clean : cleans) {
       HoodieCleanMetadata cleanMetadata =
-          AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
-      rows.add(new Comparable[] {clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
-          cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
+              AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
+      rows.add(new Comparable[]{clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
+              cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
     }
 
     TableHeader header =
@@ -110,8 +109,8 @@ public class CleansCommand implements CommandMarker {
       String path = entry.getKey();
       HoodieCleanPartitionMetadata stats = entry.getValue();
       String policy = stats.getPolicy();
-      Integer totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size();
-      Integer totalFailedDeletedFiles = stats.getFailedDeleteFiles().size();
+      int totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size();
+      int totalFailedDeletedFiles = stats.getFailedDeleteFiles().size();
       rows.add(new Comparable[] {path, policy, totalSuccessDeletedFiles, totalFailedDeletedFiles});
     }
 
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 089f6f4..c5b2faa 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -53,7 +53,7 @@ public class CommitsCommand implements CommandMarker {
 
   @CliCommand(value = "commits show", help = "Show the commits")
   public String showCommits(
-      @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
+      @CliOption(key = {"limit"}, help = "Limit commits",
           unspecifiedDefaultValue = "-1") final Integer limit,
       @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
       @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -65,8 +65,7 @@ public class CommitsCommand implements CommandMarker {
     HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
     List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList());
     List<Comparable[]> rows = new ArrayList<>();
-    for (int i = 0; i < commits.size(); i++) {
-      HoodieInstant commit = commits.get(i);
+    for (HoodieInstant commit : commits) {
       HoodieCommitMetadata commitMetadata =
           HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class);
       rows.add(new Comparable[] {commit.getTimestamp(), commitMetadata.fetchTotalBytesWritten(),
@@ -76,9 +75,7 @@ public class CommitsCommand implements CommandMarker {
     }
 
     Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
-    fieldNameToConverterMap.put("Total Bytes Written", entry -> {
-      return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
-    });
+    fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))));
 
     TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Bytes Written")
         .addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
@@ -95,7 +92,7 @@ public class CommitsCommand implements CommandMarker {
 
   @CliCommand(value = "commit rollback", help = "Rollback a commit")
   public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String commitTime,
-      @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath)
+      @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath)
       throws Exception {
     HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
     HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
@@ -163,9 +160,7 @@ public class CommitsCommand implements CommandMarker {
     }
 
     Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
-    fieldNameToConverterMap.put("Total Bytes Written", entry -> {
-      return NumericUtils.humanReadableByteCount((Long.valueOf(entry.toString())));
-    });
+    fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
 
     TableHeader header = new TableHeader().addTableHeaderField("Partition Path")
         .addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
@@ -240,8 +235,7 @@ public class CommitsCommand implements CommandMarker {
   }
 
   @CliCommand(value = "commits sync", help = "Compare commits with another Hoodie dataset")
-  public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path)
-      throws Exception {
+  public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path) {
     HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path);
     HoodieCLI.state = HoodieCLI.CLIState.SYNC;
     return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index 6a188c1..3a518ee 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -38,11 +38,12 @@ import org.apache.hudi.common.util.AvroUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.func.OperationResult;
+import org.apache.hudi.utilities.UtilHelpers;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.launcher.SparkLauncher;
@@ -85,7 +86,7 @@ public class CompactionCommand implements CommandMarker {
   public String compactionsAll(
       @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
           unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
-      @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
+      @CliOption(key = {"limit"}, help = "Limit commits",
           unspecifiedDefaultValue = "-1") final Integer limit,
       @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
       @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -100,10 +101,9 @@ public class CompactionCommand implements CommandMarker {
 
     List<HoodieInstant> instants = timeline.getReverseOrderedInstants().collect(Collectors.toList());
     List<Comparable[]> rows = new ArrayList<>();
-    for (int i = 0; i < instants.size(); i++) {
-      HoodieInstant instant = instants.get(i);
+    for (HoodieInstant instant : instants) {
       HoodieCompactionPlan compactionPlan = null;
-      if (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) {
+      if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
         try {
           // This could be a completed compaction. Assume a compaction request file is present but skip if fails
           compactionPlan = AvroUtils.deserializeCompactionPlan(
@@ -118,7 +118,7 @@ public class CompactionCommand implements CommandMarker {
       }
 
       if (null != compactionPlan) {
-        HoodieInstant.State state = instant.getState();
+        State state = instant.getState();
         if (committed.contains(instant.getTimestamp())) {
           state = State.COMPLETED;
         }
@@ -146,7 +146,7 @@ public class CompactionCommand implements CommandMarker {
   public String compactionShow(
       @CliOption(key = "instant", mandatory = true,
           help = "Base path for the target hoodie dataset") final String compactionInstantTime,
-      @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
+      @CliOption(key = {"limit"}, help = "Limit commits",
           unspecifiedDefaultValue = "-1") final Integer limit,
       @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
       @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -212,8 +212,7 @@ public class CompactionCommand implements CommandMarker {
       @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
           help = "Spark executor memory") final String sparkMemory,
       @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
-      @CliOption(key = "compactionInstant", mandatory = false,
-          help = "Base path for the target hoodie dataset") String compactionInstantTime,
+      @CliOption(key = "compactionInstant", help = "Base path for the target hoodie dataset") String compactionInstantTime,
       @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
         unspecifiedDefaultValue = "") final String propsFilePath,
       @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
@@ -286,7 +285,7 @@ public class CompactionCommand implements CommandMarker {
 
     String outputPathStr = getTmpSerializerFile();
     Path outputPath = new Path(outputPathStr);
-    String output = null;
+    String output;
     try {
       String sparkPropertiesPath = Utils
           .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -300,10 +299,10 @@ public class CompactionCommand implements CommandMarker {
         return "Failed to validate compaction for " + compactionInstant;
       }
       List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
-      boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true);
+      boolean valid = res.stream().map(OperationResult::isSuccess).reduce(Boolean::logicalAnd).orElse(true);
       String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
       List<Comparable[]> rows = new ArrayList<>();
-      res.stream().forEach(r -> {
+      res.forEach(r -> {
         Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(),
             r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "",
             r.getOperation().getDeltaFileNames().size(), r.isSuccess(),
@@ -347,7 +346,7 @@ public class CompactionCommand implements CommandMarker {
 
     String outputPathStr = getTmpSerializerFile();
     Path outputPath = new Path(outputPathStr);
-    String output = "";
+    String output;
     try {
       String sparkPropertiesPath = Utils
           .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -391,7 +390,7 @@ public class CompactionCommand implements CommandMarker {
 
     String outputPathStr = getTmpSerializerFile();
     Path outputPath = new Path(outputPathStr);
-    String output = "";
+    String output;
     try {
       String sparkPropertiesPath = Utils
           .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -437,7 +436,7 @@ public class CompactionCommand implements CommandMarker {
 
     String outputPathStr = getTmpSerializerFile();
     Path outputPath = new Path(outputPathStr);
-    String output = "";
+    String output;
     try {
       String sparkPropertiesPath = Utils
           .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -476,7 +475,7 @@ public class CompactionCommand implements CommandMarker {
       }
 
       List<Comparable[]> rows = new ArrayList<>();
-      res.stream().forEach(r -> {
+      res.forEach(r -> {
         Comparable[] row =
             new Comparable[] {r.getOperation().fileId, r.getOperation().srcPath, r.getOperation().destPath,
                 r.isExecuted(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : ""};
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java
index d5d1e82..302931e 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java
@@ -49,14 +49,14 @@ public class DatasetsCommand implements CommandMarker {
   @CliCommand(value = "connect", help = "Connect to a hoodie dataset")
   public String connect(
       @CliOption(key = {"path"}, mandatory = true, help = "Base Path of the dataset") final String path,
-      @CliOption(key = {"layoutVersion"}, mandatory = false, help = "Timeline Layout version") Integer layoutVersion,
-      @CliOption(key = {"eventuallyConsistent"}, mandatory = false, unspecifiedDefaultValue = "false",
+      @CliOption(key = {"layoutVersion"}, help = "Timeline Layout version") Integer layoutVersion,
+      @CliOption(key = {"eventuallyConsistent"}, unspecifiedDefaultValue = "false",
           help = "Enable eventual consistency") final boolean eventuallyConsistent,
-      @CliOption(key = {"initialCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "2000",
+      @CliOption(key = {"initialCheckIntervalMs"}, unspecifiedDefaultValue = "2000",
           help = "Initial wait time for eventual consistency") final Integer initialConsistencyIntervalMs,
-      @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "300000",
+      @CliOption(key = {"maxCheckIntervalMs"}, unspecifiedDefaultValue = "300000",
           help = "Max wait time for eventual consistency") final Integer maxConsistencyIntervalMs,
-      @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "7",
+      @CliOption(key = {"maxCheckIntervalMs"}, unspecifiedDefaultValue = "7",
           help = "Max checks for eventual consistency") final Integer maxConsistencyChecks)
       throws IOException {
     HoodieCLI
@@ -118,7 +118,7 @@ public class DatasetsCommand implements CommandMarker {
   /**
    * Describes table properties.
    */
-  @CliCommand(value = "desc", help = "Describle Hoodie Table properties")
+  @CliCommand(value = "desc", help = "Describe Hoodie Table properties")
   public String descTable() {
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     TableHeader header = new TableHeader().addTableHeaderField("Property").addTableHeaderField("Value");
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
index e94e16a..597bab3 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
@@ -22,6 +22,7 @@ import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
@@ -90,13 +91,13 @@ public class FileSystemViewCommand implements CommandMarker {
       row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getFileSize() : -1;
       if (!readOptimizedOnly) {
         row[idx++] = fs.getLogFiles().count();
-        row[idx++] = fs.getLogFiles().mapToLong(lf -> lf.getFileSize()).sum();
+        row[idx++] = fs.getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum();
         row[idx++] = fs.getLogFiles().collect(Collectors.toList()).toString();
       }
       rows.add(row);
     }));
     Function<Object, String> converterFunction =
-        entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
+        entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
     Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
     fieldNameToConverterMap.put("Total Delta File Size", converterFunction);
     fieldNameToConverterMap.put("Data-File Size", converterFunction);
@@ -160,15 +161,15 @@ public class FileSystemViewCommand implements CommandMarker {
 
       if (!readOptimizedOnly) {
         row[idx++] = fs.getLogFiles().count();
-        row[idx++] = fs.getLogFiles().mapToLong(lf -> lf.getFileSize()).sum();
+        row[idx++] = fs.getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum();
         long logFilesScheduledForCompactionTotalSize =
             fs.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(fs.getBaseInstantTime()))
-                .mapToLong(lf -> lf.getFileSize()).sum();
+                .mapToLong(HoodieLogFile::getFileSize).sum();
         row[idx++] = logFilesScheduledForCompactionTotalSize;
 
         long logFilesUnscheduledTotalSize =
             fs.getLogFiles().filter(lf -> !lf.getBaseCommitTime().equals(fs.getBaseInstantTime()))
-                .mapToLong(lf -> lf.getFileSize()).sum();
+                .mapToLong(HoodieLogFile::getFileSize).sum();
         row[idx++] = logFilesUnscheduledTotalSize;
 
         double logSelectedForCompactionToBaseRatio =
@@ -186,7 +187,7 @@ public class FileSystemViewCommand implements CommandMarker {
     });
 
     Function<Object, String> converterFunction =
-        entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
+        entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
     Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
     fieldNameToConverterMap.put("Data-File Size", converterFunction);
     if (!readOptimizedOnly) {
@@ -230,9 +231,9 @@ public class FileSystemViewCommand implements CommandMarker {
     FileSystem fs = HoodieCLI.fs;
     String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex);
     FileStatus[] statuses = fs.globStatus(new Path(globPath));
-    Stream<HoodieInstant> instantsStream = null;
+    Stream<HoodieInstant> instantsStream;
 
-    HoodieTimeline timeline = null;
+    HoodieTimeline timeline;
     if (readOptimizedOnly) {
       timeline = metaClient.getActiveTimeline().getCommitTimeline();
     } else if (excludeCompaction) {
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
index 4ba9d3d..4c81475 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java
@@ -23,8 +23,8 @@ import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
 import org.apache.hudi.cli.utils.InputStreamConsumer;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.utilities.HDFSParquetImporter.FormatValidator;
-
 import org.apache.hudi.utilities.UtilHelpers;
+
 import org.apache.spark.launcher.SparkLauncher;
 import org.apache.spark.util.Utils;
 import org.springframework.shell.core.CommandMarker;
@@ -42,7 +42,7 @@ public class HDFSParquetImportCommand implements CommandMarker {
 
   @CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset")
   public String convert(
-      @CliOption(key = "upsert", mandatory = false, unspecifiedDefaultValue = "false",
+      @CliOption(key = "upsert", unspecifiedDefaultValue = "false",
           help = "Uses upsert API instead of the default insert API of WriteClient") boolean useUpsert,
       @CliOption(key = "srcPath", mandatory = true, help = "Base path for the input dataset") final String srcPath,
       @CliOption(key = "targetPath", mandatory = true,
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index f9bea47..8a50309 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -38,8 +38,10 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.hive.util.SchemaUtil;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileStatus;
@@ -84,14 +86,13 @@ public class HoodieLogFileCommand implements CommandMarker {
         .map(status -> status.getPath().toString()).collect(Collectors.toList());
     Map<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata =
         Maps.newHashMap();
-    int totalEntries = 0;
     int numCorruptBlocks = 0;
     int dummyInstantTimeCount = 0;
 
     for (String logFilePath : logFilePaths) {
       FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath));
       Schema writerSchema = new AvroSchemaConverter()
-          .convert(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath)));
+          .convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath))));
       Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
 
       // read the avro blocks
@@ -124,14 +125,12 @@ public class HoodieLogFileCommand implements CommandMarker {
         if (commitCountAndMetadata.containsKey(instantTime)) {
           commitCountAndMetadata.get(instantTime).add(
               new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount));
-          totalEntries++;
         } else {
           List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>> list =
               new ArrayList<>();
           list.add(
               new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount));
           commitCountAndMetadata.put(instantTime, list);
-          totalEntries++;
         }
       }
       reader.close();
@@ -141,7 +140,7 @@ public class HoodieLogFileCommand implements CommandMarker {
     ObjectMapper objectMapper = new ObjectMapper();
     for (Map.Entry<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> entry : commitCountAndMetadata
         .entrySet()) {
-      String instantTime = entry.getKey().toString();
+      String instantTime = entry.getKey();
       for (Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer> tuple3 : entry
           .getValue()) {
         Comparable[] output = new Comparable[5];
@@ -163,11 +162,11 @@ public class HoodieLogFileCommand implements CommandMarker {
 
   @CliCommand(value = "show logfile records", help = "Read records from log files")
   public String showLogFileRecords(
-      @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
+      @CliOption(key = {"limit"}, help = "Limit commits",
           unspecifiedDefaultValue = "10") final Integer limit,
       @CliOption(key = "logFilePathPattern", mandatory = true,
           help = "Fully qualified paths for the log files") final String logFilePathPattern,
-      @CliOption(key = "mergeRecords", mandatory = false, help = "If the records in the log files should be merged",
+      @CliOption(key = "mergeRecords", help = "If the records in the log files should be merged",
           unspecifiedDefaultValue = "false") final Boolean shouldMerge)
       throws IOException {
 
@@ -182,7 +181,7 @@ public class HoodieLogFileCommand implements CommandMarker {
     AvroSchemaConverter converter = new AvroSchemaConverter();
     // get schema from last log file
     Schema readerSchema =
-        converter.convert(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1))));
+        converter.convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1)))));
 
     List<IndexedRecord> allRecords = new ArrayList<>();
 
@@ -191,11 +190,11 @@ public class HoodieLogFileCommand implements CommandMarker {
       HoodieMergedLogRecordScanner scanner =
           new HoodieMergedLogRecordScanner(fs, client.getBasePath(), logFilePaths, readerSchema,
               client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(),
-              Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES),
-              Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
-              Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
-              Integer.valueOf(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
-              HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
+                  HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
+                  Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
+                  Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
+                  HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
+                  HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
       for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
         Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
         if (allRecords.size() < limit) {
@@ -205,7 +204,7 @@ public class HoodieLogFileCommand implements CommandMarker {
     } else {
       for (String logFile : logFilePaths) {
         Schema writerSchema = new AvroSchemaConverter()
-            .convert(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile)));
+            .convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile))));
         HoodieLogFormat.Reader reader =
             HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema);
         // read the avro blocks
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
index 152e21c..5346b98 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
@@ -31,6 +31,7 @@ import org.springframework.shell.core.annotation.CliCommand;
 import org.springframework.shell.core.annotation.CliOption;
 import org.springframework.stereotype.Component;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -50,7 +51,7 @@ public class HoodieSyncCommand implements CommandMarker {
           help = "total number of recent partitions to validate") final int partitionCount,
       @CliOption(key = {"hiveServerUrl"}, mandatory = true,
           help = "hiveServerURL to connect to") final String hiveServerUrl,
-      @CliOption(key = {"hiveUser"}, mandatory = false, unspecifiedDefaultValue = "",
+      @CliOption(key = {"hiveUser"}, unspecifiedDefaultValue = "",
           help = "hive username to connect to") final String hiveUser,
       @CliOption(key = {"hivePass"}, mandatory = true, unspecifiedDefaultValue = "",
           help = "hive password to connect to") final String hivePass)
@@ -80,33 +81,27 @@ public class HoodieSyncCommand implements CommandMarker {
     if (sourceLatestCommit != null
         && HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) {
       // source is behind the target
-      List<HoodieInstant> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
-          .getInstants().collect(Collectors.toList());
-      if (commitsToCatchup.isEmpty()) {
-        return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count("
-            + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount);
-      } else {
-        long newInserts = CommitUtil.countNewRecords(target,
-            commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()));
-        return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count("
-            + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount) + ". Catch up count is "
-            + newInserts;
-      }
+      return getString(target, targetTimeline, source, sourceCount, targetCount, sourceLatestCommit);
     } else {
-      List<HoodieInstant> commitsToCatchup = sourceTimeline.findInstantsAfter(targetLatestCommit, Integer.MAX_VALUE)
-          .getInstants().collect(Collectors.toList());
-      if (commitsToCatchup.isEmpty()) {
-        return "Count difference now is (count(" + source.getTableConfig().getTableName() + ") - count("
-            + target.getTableConfig().getTableName() + ") == " + (sourceCount - targetCount);
-      } else {
-        long newInserts = CommitUtil.countNewRecords(source,
-            commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()));
-        return "Count difference now is (count(" + source.getTableConfig().getTableName() + ") - count("
-            + target.getTableConfig().getTableName() + ") == " + (sourceCount - targetCount) + ". Catch up count is "
-            + newInserts;
-      }
+      return getString(source, sourceTimeline, target, targetCount, sourceCount, targetLatestCommit);
 
     }
   }
 
+  private String getString(HoodieTableMetaClient target, HoodieTimeline targetTimeline, HoodieTableMetaClient source, long sourceCount, long targetCount, String sourceLatestCommit)
+          throws IOException {
+    List<HoodieInstant> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
+        .getInstants().collect(Collectors.toList());
+    if (commitsToCatchup.isEmpty()) {
+      return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count("
+          + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount);
+    } else {
+      long newInserts = CommitUtil.countNewRecords(target,
+          commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()));
+      return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count("
+          + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount) + ". Catch up count is "
+          + newInserts;
+    }
+  }
+
 }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 4b4c283..37f66f6 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -49,7 +49,7 @@ public class RepairsCommand implements CommandMarker {
           mandatory = true) final String duplicatedPartitionPath,
       @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files",
           mandatory = true) final String repairedOutputPath,
-      @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path",
+      @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
           mandatory = true) final String sparkPropertiesPath)
       throws Exception {
     SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java
index a3eb6a7..4a122c6 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java
@@ -99,20 +99,18 @@ public class RollbacksCommand implements CommandMarker {
     HoodieRollbackMetadata metadata = AvroUtils.deserializeAvroMetadata(
         activeTimeline.getInstantDetails(new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, rollbackInstant)).get(),
         HoodieRollbackMetadata.class);
-    metadata.getPartitionMetadata().entrySet().forEach(e -> {
-      Stream
-          .concat(e.getValue().getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)),
-              e.getValue().getFailedDeleteFiles().stream().map(f -> Pair.of(f, false)))
-          .forEach(fileWithDeleteStatus -> {
-            Comparable[] row = new Comparable[5];
-            row[0] = metadata.getStartRollbackTime();
-            row[1] = metadata.getCommitsRollback().toString();
-            row[2] = e.getKey();
-            row[3] = fileWithDeleteStatus.getLeft();
-            row[4] = fileWithDeleteStatus.getRight();
-            rows.add(row);
-          });
-    });
+    metadata.getPartitionMetadata().forEach((key, value) -> Stream
+            .concat(value.getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)),
+                    value.getFailedDeleteFiles().stream().map(f -> Pair.of(f, false)))
+            .forEach(fileWithDeleteStatus -> {
+              Comparable[] row = new Comparable[5];
+              row[0] = metadata.getStartRollbackTime();
+              row[1] = metadata.getCommitsRollback().toString();
+              row[2] = key;
+              row[3] = fileWithDeleteStatus.getLeft();
+              row[4] = fileWithDeleteStatus.getRight();
+              rows.add(row);
+            }));
 
     TableHeader header = new TableHeader().addTableHeaderField("Instant").addTableHeaderField("Rolledback Instants")
         .addTableHeaderField("Partition").addTableHeaderField("Deleted File").addTableHeaderField("Succeeded");
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
index 69a1584..d28ba27 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
@@ -93,7 +93,7 @@ public class SavepointsCommand implements CommandMarker {
   @CliCommand(value = "savepoint rollback", help = "Savepoint a commit")
   public String rollbackToSavepoint(
       @CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String commitTime,
-      @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath)
+      @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath)
       throws Exception {
     HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
     if (metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty()) {
@@ -122,7 +122,7 @@ public class SavepointsCommand implements CommandMarker {
   }
 
   @CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
-  public String refreshMetaClient() throws IOException {
+  public String refreshMetaClient() {
     HoodieCLI.refreshTableMetadata();
     return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed.";
   }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 8ff52fa..13d1c8b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -124,19 +124,19 @@ public class SparkMain {
       case COMPACT_REPAIR:
         assert (args.length == 8);
         doCompactRepair(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
-            Boolean.valueOf(args[7]));
+            Boolean.parseBoolean(args[7]));
         returnCode = 0;
         break;
       case COMPACT_UNSCHEDULE_FILE:
         assert (args.length == 9);
         doCompactUnscheduleFile(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
-            Boolean.valueOf(args[7]), Boolean.valueOf(args[8]));
+            Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
         returnCode = 0;
         break;
       case COMPACT_UNSCHEDULE_PLAN:
         assert (args.length == 9);
         doCompactUnschedule(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
-            Boolean.valueOf(args[7]), Boolean.valueOf(args[8]));
+            Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
         returnCode = 0;
         break;
       case CLEAN:
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
index 7fc3b25..b05aee2 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java
@@ -73,7 +73,6 @@ public class StatsCommand implements CommandMarker {
     HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
 
     List<Comparable[]> rows = new ArrayList<>();
-    int i = 0;
     DecimalFormat df = new DecimalFormat("#.00");
     for (HoodieInstant commitTime : timeline.getInstants().collect(Collectors.toList())) {
       String waf = "0";
@@ -94,7 +93,7 @@ public class StatsCommand implements CommandMarker {
     rows.add(new Comparable[] {"Total", totalRecordsUpserted, totalRecordsWritten, waf});
 
     TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Upserted")
-        .addTableHeaderField("Total Written").addTableHeaderField("Write Amplifiation Factor");
+        .addTableHeaderField("Total Written").addTableHeaderField("Write Amplification Factor");
     return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
   }
 
@@ -120,7 +119,7 @@ public class StatsCommand implements CommandMarker {
 
     // max, min, #small files < 10MB, 50th, avg, 95th
     Histogram globalHistogram = new Histogram(new UniformReservoir(MAX_FILES));
-    HashMap<String, Histogram> commitHistoMap = new HashMap<String, Histogram>();
+    HashMap<String, Histogram> commitHistoMap = new HashMap<>();
     for (FileStatus fileStatus : statuses) {
       String commitTime = FSUtils.getCommitTime(fileStatus.getPath().getName());
       long sz = fileStatus.getLen();
@@ -132,7 +131,6 @@ public class StatsCommand implements CommandMarker {
     }
 
     List<Comparable[]> rows = new ArrayList<>();
-    int ind = 0;
     for (String commitTime : commitHistoMap.keySet()) {
       Snapshot s = commitHistoMap.get(commitTime).getSnapshot();
       rows.add(printFileSizeHistogram(commitTime, s));
@@ -141,7 +139,7 @@ public class StatsCommand implements CommandMarker {
     rows.add(printFileSizeHistogram("ALL", s));
 
     Function<Object, String> converterFunction =
-        entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
+        entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
     Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
     fieldNameToConverterMap.put("Min", converterFunction);
     fieldNameToConverterMap.put("10th", converterFunction);
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
index 5b5a3f5..b71a979 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
@@ -24,7 +24,8 @@ import org.apache.hudi.cli.commands.SparkMain;
 import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.StringUtils;
 
-import org.apache.log4j.Logger;
+import com.google.common.base.Preconditions;
+
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.launcher.SparkLauncher;
@@ -38,8 +39,7 @@ import java.util.Map;
  */
 public class SparkUtil {
 
-  private static final Logger LOG = Logger.getLogger(SparkUtil.class);
-  public static final String DEFUALT_SPARK_MASTER = "yarn-client";
+  public static final String DEFAULT_SPARK_MASTER = "yarn-client";
 
   /**
    * TODO: Need to fix a bunch of hardcoded stuff here eg: history server, spark distro.
@@ -55,7 +55,7 @@ public class SparkUtil {
       sparkLauncher.setPropertiesFile(propertiesFile);
     }
     File libDirectory = new File(new File(currentJar).getParent(), "lib");
-    for (String library : libDirectory.list()) {
+    for (String library : Preconditions.checkNotNull(libDirectory.list())) {
       sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath());
     }
     return sparkLauncher;
@@ -66,7 +66,7 @@ public class SparkUtil {
 
     String defMasterFromEnv = sparkConf.getenv("SPARK_MASTER");
     if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) {
-      sparkConf.setMaster(DEFUALT_SPARK_MASTER);
+      sparkConf.setMaster(DEFAULT_SPARK_MASTER);
     } else {
       sparkConf.setMaster(defMasterFromEnv);
     }
@@ -82,7 +82,7 @@ public class SparkUtil {
     sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
     sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
 
-    sparkConf = HoodieWriteClient.registerClasses(sparkConf);
+    HoodieWriteClient.registerClasses(sparkConf);
     JavaSparkContext jsc = new JavaSparkContext(sparkConf);
     jsc.hadoopConfiguration().setBoolean("parquet.enable.summary-metadata", false);
     FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());


Mime
View raw message