hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From le...@apache.org
Subject [incubator-hudi] branch master updated: [HUDI-595] code cleanup, refactoring code out of PR# 1159 (#1302)
Date Tue, 04 Feb 2020 13:52:11 GMT
This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 594da28  [HUDI-595] code cleanup, refactoring code out of PR# 1159 (#1302)
594da28 is described below

commit 594da28fbf64fb20432e718a409577fd10516c4a
Author: Suneel Marthi <smarthi@apache.org>
AuthorDate: Tue Feb 4 14:52:03 2020 +0100

    [HUDI-595] code cleanup, refactoring code out of PR# 1159 (#1302)
---
 .../org/apache/hudi/index/hbase/HBaseIndex.java    | 23 ++-------
 .../org/apache/hudi/io/HoodieCommitArchiveLog.java |  6 +--
 .../io/compact/strategy/CompactionStrategy.java    |  8 +--
 .../main/java/org/apache/hudi/metrics/Metrics.java | 17 +++----
 .../src/test/java/org/apache/hudi/TestCleaner.java | 37 +++++++-------
 .../hudi/common/HoodieTestDataGenerator.java       | 14 ++----
 .../index/bloom/TestHoodieGlobalBloomIndex.java    | 51 ++++++++++++-------
 .../org/apache/hudi/common/HoodieJsonPayload.java  |  5 +-
 .../hudi/common/table/log/HoodieLogFileReader.java | 33 ++++++------
 .../java/org/apache/hudi/common/util/FSUtils.java  |  2 +-
 .../hudi/common/minicluster/HdfsTestService.java   |  2 +-
 .../hudi/common/table/log/TestHoodieLogFormat.java | 26 +++-------
 .../table/view/TestHoodieTableFileSystemView.java  | 10 ++--
 .../common/util/collection/TestDiskBasedMap.java   |  6 +--
 .../hadoop/hive/HoodieCombineHiveInputFormat.java  |  9 ++--
 .../realtime/TestHoodieRealtimeRecordReader.java   |  5 +-
 .../java/org/apache/hudi/hive/util/SchemaUtil.java | 31 +++++-------
 .../org/apache/hudi/hive/TestHiveSyncTool.java     |  4 +-
 .../org/apache/hudi/hive/util/HiveTestService.java | 11 ++--
 .../org/apache/hudi/integ/ITTestHoodieDemo.java    | 58 +++++++++++-----------
 .../apache/hudi/utilities/HDFSParquetImporter.java |  7 ++-
 .../hudi/utilities/perf/TimelineServerPerf.java    | 20 +++-----
 .../sources/helpers/IncrSourceHelper.java          |  4 +-
 .../utilities/sources/helpers/KafkaOffsetGen.java  |  7 +--
 24 files changed, 172 insertions(+), 224 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
index 3f79096..12d352d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
@@ -205,9 +205,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
         }
       }
       List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
-      HTable hTable = null;
-      try {
-        hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
+      try (HTable hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName))) {
         List<Get> statements = new ArrayList<>();
         List<HoodieRecord> currentBatchOfRecords = new LinkedList<>();
         // Do the tagging.
@@ -250,15 +248,6 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
         }
       } catch (IOException e) {
         throw new HoodieIndexException("Failed to Tag indexed locations because of exception with HBase Client", e);
-      } finally {
-        if (hTable != null) {
-          try {
-            hTable.close();
-          } catch (IOException e) {
-            // Ignore
-          }
-        }
-
       }
       return taggedRecords.iterator();
     };
@@ -444,16 +433,14 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
      */
     public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer, int numTasksDuringPut,
         int maxExecutors, int sleepTimeMs, float qpsFraction) {
-      int numRSAlive = numRegionServersForTable;
-      int maxReqPerSec = (int) (qpsFraction * numRSAlive * maxQpsPerRegionServer);
-      int numTasks = numTasksDuringPut;
-      int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors));
+      int maxReqPerSec = (int) (qpsFraction * numRegionServersForTable * maxQpsPerRegionServer);
+      int maxParallelPuts = Math.max(1, Math.min(numTasksDuringPut, maxExecutors));
       int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs;
       int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec));
       LOG.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction);
-      LOG.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive);
+      LOG.info("HbaseIndexThrottling: numRSAlive :" + numRegionServersForTable);
       LOG.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec);
-      LOG.info("HbaseIndexThrottling: numTasks :" + numTasks);
+      LOG.info("HbaseIndexThrottling: numTasks :" + numTasksDuringPut);
       LOG.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors);
       LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts);
       LOG.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec);
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java
index bafbc8d..6847a24 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java
@@ -147,9 +147,9 @@ public class HoodieCommitArchiveLog {
     HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
         .getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
     Stream<HoodieInstant> instants = cleanAndRollbackTimeline.getInstants()
-        .collect(Collectors.groupingBy(s -> s.getAction())).entrySet().stream().map(i -> {
-          if (i.getValue().size() > maxCommitsToKeep) {
-            return i.getValue().subList(0, i.getValue().size() - minCommitsToKeep);
+        .collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream().map(hoodieInstants -> {
+          if (hoodieInstants.size() > maxCommitsToKeep) {
+            return hoodieInstants.subList(0, hoodieInstants.size() - minCommitsToKeep);
           } else {
             return new ArrayList<HoodieInstant>();
           }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java
index 4c03116..dd17212 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/strategy/CompactionStrategy.java
@@ -62,10 +62,10 @@ public abstract class CompactionStrategy implements Serializable {
   public Map<String, Double> captureMetrics(HoodieWriteConfig writeConfig, Option<HoodieBaseFile> dataFile,
       String partitionPath, List<HoodieLogFile> logFiles) {
     Map<String, Double> metrics = Maps.newHashMap();
-    Long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
+    long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
     // Total size of all the log files
     Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0)
-        .reduce((size1, size2) -> size1 + size2).orElse(0L);
+        .reduce(Long::sum).orElse(0L);
     // Total read will be the base file + all the log files
     Long totalIORead =
         FSUtils.getSizeInMB((dataFile.isPresent() ? dataFile.get().getFileSize() : 0L) + totalLogFileSize);
@@ -73,11 +73,11 @@ public abstract class CompactionStrategy implements Serializable {
     Long totalIOWrite =
         FSUtils.getSizeInMB(dataFile.isPresent() ? dataFile.get().getFileSize() : defaultMaxParquetFileSize);
     // Total IO will the the IO for read + write
-    Long totalIO = totalIORead + totalIOWrite;
+    long totalIO = totalIORead + totalIOWrite;
     // Save these metrics and we will use during the filter
     metrics.put(TOTAL_IO_READ_MB, totalIORead.doubleValue());
     metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite.doubleValue());
-    metrics.put(TOTAL_IO_MB, totalIO.doubleValue());
+    metrics.put(TOTAL_IO_MB, (double) totalIO);
     metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue());
     metrics.put(TOTAL_LOG_FILES, (double) logFiles.size());
     return metrics;
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
index 4b19441..533208f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -49,17 +49,14 @@ public class Metrics {
     }
     // reporter.start();
 
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        try {
-          reporter.report();
-          Closeables.close(reporter.getReporter(), true);
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        reporter.report();
+        Closeables.close(reporter.getReporter(), true);
+      } catch (Exception e) {
+        e.printStackTrace();
       }
-    });
+    }));
   }
 
   public static Metrics getInstance() {
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
index 24aa9cd..662273a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
@@ -68,6 +68,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -106,7 +107,7 @@ public class TestCleaner extends TestHoodieClientBase {
       Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
       Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
 
-    /**
+    /*
      * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
      * in insert(), if the implementation diverges.)
      */
@@ -606,8 +607,8 @@ public class TestCleaner extends TestHoodieClientBase {
     String filePath2 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName2;
 
     List<String> deletePathPatterns1 = Arrays.asList(filePath1, filePath2);
-    List<String> successDeleteFiles1 = Arrays.asList(filePath1);
-    List<String> failedDeleteFiles1 = Arrays.asList(filePath2);
+    List<String> successDeleteFiles1 = Collections.singletonList(filePath1);
+    List<String> failedDeleteFiles1 = Collections.singletonList(filePath2);
 
     // create partition1 clean stat.
     HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
@@ -630,7 +631,8 @@ public class TestCleaner extends TestHoodieClientBase {
 
     // map with relative path.
     Map<String, Tuple3> newExpected = new HashMap<>();
-    newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Arrays.asList(fileName1), Arrays.asList(fileName2)));
+    newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Collections.singletonList(fileName1),
+            Collections.singletonList(fileName2)));
     newExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2));
 
     HoodieCleanMetadata metadata =
@@ -1079,19 +1081,18 @@ public class TestCleaner extends TestHoodieClientBase {
     });
 
     // Test for progress (Did we clean some files ?)
-    long numFilesUnderCompactionDeleted = hoodieCleanStats.stream().flatMap(cleanStat -> {
-      return convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns())
-          .map(fileIdWithCommitTime -> {
-            if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
-              Assert.assertTrue("Deleted instant time must be less than pending compaction",
-                  HoodieTimeline.compareTimestamps(
-                      fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
-                      fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER));
-              return true;
-            }
-            return false;
-          });
-    }).filter(x -> x).count();
+    long numFilesUnderCompactionDeleted = hoodieCleanStats.stream()
+            .flatMap(cleanStat -> convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns())
+        .map(fileIdWithCommitTime -> {
+          if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
+            Assert.assertTrue("Deleted instant time must be less than pending compaction",
+                HoodieTimeline.compareTimestamps(
+                    fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
+                    fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER));
+            return true;
+          }
+          return false;
+        })).filter(x -> x).count();
     long numDeleted =
         hoodieCleanStats.stream().mapToLong(cleanStat -> cleanStat.getDeletePathPatterns().size()).sum();
     // Tighter check for regression
@@ -1123,7 +1124,7 @@ public class TestCleaner extends TestHoodieClientBase {
    * @throws IOException in case of error
    */
   private int getTotalTempFiles() throws IOException {
-    RemoteIterator itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true);
+    RemoteIterator<?> itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true);
     int count = 0;
     while (itr.hasNext()) {
       count++;
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index 56b3c07..e0d2a53 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -210,13 +210,10 @@ public class HoodieTestDataGenerator {
     Path commitFile =
         new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName());
     FileSystem fs = FSUtils.getFs(basePath, configuration);
-    FSDataOutputStream os = fs.create(commitFile, true);
-    HoodieCompactionPlan workload = new HoodieCompactionPlan();
-    try {
+    try (FSDataOutputStream os = fs.create(commitFile, true)) {
+      HoodieCompactionPlan workload = new HoodieCompactionPlan();
       // Write empty commit metadata
       os.writeBytes(new String(AvroUtils.serializeCompactionPlan(workload).get(), StandardCharsets.UTF_8));
-    } finally {
-      os.close();
     }
   }
 
@@ -225,13 +222,10 @@ public class HoodieTestDataGenerator {
     Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
         + HoodieTimeline.makeSavePointFileName(commitTime));
     FileSystem fs = FSUtils.getFs(basePath, configuration);
-    FSDataOutputStream os = fs.create(commitFile, true);
-    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
-    try {
+    try (FSDataOutputStream os = fs.create(commitFile, true)) {
+      HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
       // Write empty commit metadata
       os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-    } finally {
-      os.close();
     }
   }
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 15a8af7..c605654 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -246,13 +247,17 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
     JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
 
     String filename0 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Arrays.asList(record1), schema, null, false);
+        HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Collections.singletonList(record1),
+                schema, null, false);
     String filename1 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Lists.newArrayList(), schema, null, false);
+        HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Lists.newArrayList(),
+                schema, null, false);
     String filename2 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record2), schema, null, false);
+        HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record2),
+                schema, null, false);
     String filename3 =
-        HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record4), schema, null, false);
+        HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record4),
+                schema, null, false);
 
     // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
     metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -265,21 +270,29 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
     JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table);
 
     for (HoodieRecord record : taggedRecordRDD.collect()) {
-      if (record.getRecordKey().equals("000")) {
-        assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename0)));
-        assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange1.getJsonData());
-      } else if (record.getRecordKey().equals("001")) {
-        assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2)));
-        assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange2.getJsonData());
-      } else if (record.getRecordKey().equals("002")) {
-        assertTrue(!record.isCurrentLocationKnown());
-        assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange3.getJsonData());
-      } else if (record.getRecordKey().equals("003")) {
-        assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
-        assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange5.getJsonData());
-      } else if (record.getRecordKey().equals("004")) {
-        assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
-        assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange4.getJsonData());
+      switch (record.getRecordKey()) {
+        case "000":
+          assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename0));
+          assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange1.getJsonData());
+          break;
+        case "001":
+          assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2));
+          assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange2.getJsonData());
+          break;
+        case "002":
+          assertFalse(record.isCurrentLocationKnown());
+          assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange3.getJsonData());
+          break;
+        case "003":
+          assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
+          assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange5.getJsonData());
+          break;
+        case "004":
+          assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
+          assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange4.getJsonData());
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown Key: " + record.getRecordKey());
       }
     }
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
index 9e95fd8..1c15c66 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java
@@ -86,11 +86,8 @@ public class HoodieJsonPayload implements HoodieRecordPayload<HoodieJsonPayload>
   }
 
   private String unCompressData(byte[] data) throws IOException {
-    InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data));
-    try {
+    try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data))) {
       return FileIOUtils.readAsUTFString(iis, dataSize);
-    } finally {
-      iis.close();
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 354f809..40a5243 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -46,6 +46,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Scans a log file and provides block level iterator on the log file Loads the entire block contents in memory Can emit
@@ -107,25 +108,22 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
    * Close the inputstream if not closed when the JVM exits.
    */
   private void addShutDownHook() {
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        try {
-          close();
-        } catch (Exception e) {
-          LOG.warn("unable to close input stream for log file " + logFile, e);
-          // fail silently for any sort of exception
-        }
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        close();
+      } catch (Exception e) {
+        LOG.warn("unable to close input stream for log file " + logFile, e);
+        // fail silently for any sort of exception
       }
-    });
+    }));
   }
 
   // TODO : convert content and block length to long by using ByteBuffer, raw byte [] allows
   // for max of Integer size
   private HoodieLogBlock readBlock() throws IOException {
 
-    int blocksize = -1;
-    int type = -1;
+    int blocksize;
+    int type;
     HoodieLogBlockType blockType = null;
     Map<HeaderMetadataType, String> header = null;
 
@@ -190,7 +188,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
     // 9. Read the log block end position in the log file
     long blockEndPos = inputStream.getPos();
 
-    switch (blockType) {
+    switch (Objects.requireNonNull(blockType)) {
       // based on type read the block
       case AVRO_DATA_BLOCK:
         if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
@@ -278,10 +276,10 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
     }
   }
 
-  @Override
-  /**
+  /*
    * hasNext is not idempotent. TODO - Fix this. It is okay for now - PR
    */
+  @Override
   public boolean hasNext() {
     try {
       return readMagic();
@@ -315,10 +313,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
     long pos = inputStream.getPos();
     // 1. Read magic header from the start of the block
     inputStream.readFully(MAGIC_BUFFER, 0, 6);
-    if (!Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC)) {
-      return false;
-    }
-    return true;
+    return Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC);
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
index 43b0030..87925c7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
@@ -193,7 +193,7 @@ public class FSUtils {
     return partitions;
   }
 
-  public static final List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
+  public static List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
       String markerDir) throws IOException {
     List<String> dataFiles = new LinkedList<>();
     processFiles(fs, markerDir, (status) -> {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
index 9bc9a8d..7fb3bfd 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
@@ -79,7 +79,7 @@ public class HdfsTestService {
 
     // Configure and start the HDFS cluster
     // boolean format = shouldFormatDFSCluster(localDFSLocation, clean);
-    hadoopConf = configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort,
+    configureDFSCluster(hadoopConf, localDFSLocation, bindIP, namenodeRpcPort,
         datanodePort, datanodeIpcPort, datanodeHttpPort);
     miniDfsCluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(format).checkDataNodeAddrConfig(true)
         .checkDataNodeHostConfig(true).build();
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
index 1b9667c..6c01d4a 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
@@ -285,7 +285,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     }
   }
 
-  /**
+  /*
    * This is actually a test on concurrent append and not recovery lease. Commenting this out.
    * https://issues.apache.org/jira/browse/HUDI-117
    */
@@ -337,7 +337,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     assertEquals(2, statuses.length);
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testBasicWriteAndScan() throws IOException, URISyntaxException, InterruptedException {
     Writer writer =
@@ -366,7 +365,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     reader.close();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testBasicAppendAndRead() throws IOException, URISyntaxException, InterruptedException {
     Writer writer =
@@ -434,7 +432,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     reader.close();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testBasicAppendAndScanMultipleFiles() throws IOException, URISyntaxException, InterruptedException {
     Writer writer =
@@ -911,11 +908,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
     writer = writer.appendBlock(dataBlock);
 
-    List<String> originalKeys =
-        copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
-            .collect(Collectors.toList());
-
-    // Delete 50 keys
     // Delete 50 keys
     List<HoodieKey> deletedKeys = copyOfRecords1.stream()
         .map(s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
@@ -1127,8 +1119,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
    * duplicate data.
    *
    */
-  private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2)
-      throws IOException, URISyntaxException, InterruptedException {
+  private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2) {
     try {
       // Write one Data block with same InstantTime (written in same batch)
       Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -1178,8 +1169,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
   }
 
   @Test
-  public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt()
-      throws IOException, URISyntaxException, InterruptedException {
+  public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt() {
     /*
      * FIRST_ATTEMPT_FAILED:
      * Original task from the stage attempt failed, but subsequent stage retry succeeded.
@@ -1188,8 +1178,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
   }
 
   @Test
-  public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt()
-      throws IOException, URISyntaxException, InterruptedException {
+  public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt() {
     /*
      * SECOND_ATTEMPT_FAILED:
      * Original task from stage attempt succeeded, but subsequent retry attempt failed.
@@ -1198,8 +1187,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
   }
 
   @Test
-  public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts()
-      throws IOException, URISyntaxException, InterruptedException {
+  public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts() {
     /*
      * BOTH_ATTEMPTS_SUCCEEDED:
      * Original task from the stage attempt and duplicate task from the stage retry succeeded.
@@ -1207,7 +1195,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     testAvroLogRecordReaderMergingMultipleLogFiles(100, 100);
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException {
     Writer writer =
@@ -1335,7 +1322,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     reader.close();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testBasicAppendAndTraverseInReverse() throws IOException, URISyntaxException, InterruptedException {
     Writer writer =
@@ -1392,7 +1378,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
   }
 
   @Test
-  public void testV0Format() throws IOException, InterruptedException, URISyntaxException {
+  public void testV0Format() throws IOException, URISyntaxException {
     // HoodieLogFormatVersion.DEFAULT_VERSION has been deprecated so we cannot
     // create a writer for it. So these tests are only for the HoodieAvroDataBlock
     // of older version.
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 2b8f04f..0a910e9 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -221,7 +221,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
    */
   public Stream<FileSlice> getLatestRawFileSlices(String partitionPath) {
     return fsView.getAllFileGroups(partitionPath).map(HoodieFileGroup::getLatestFileSlicesIncludingInflight)
-        .filter(fileSliceOpt -> fileSliceOpt.isPresent()).map(Option::get);
+        .filter(Option::isPresent).map(Option::get);
   }
 
   /**
@@ -322,7 +322,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
       assertEquals("Expect only valid data-file", dataFileName, dataFiles.get(0).getFileName());
     }
 
-    /** Merge API Tests **/
+    // Merge API Tests
     List<FileSlice> fileSliceList =
         rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5).collect(Collectors.toList());
     assertEquals("Expect file-slice to be merged", 1, fileSliceList.size());
@@ -355,7 +355,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
     assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName());
     assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName());
 
-    /** Data Files API tests */
+    // Data Files API tests
     dataFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
     if (skipCreatingDataFile) {
       assertEquals("Expect no data file to be returned", 0, dataFiles.size());
@@ -385,7 +385,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
       dataFiles.forEach(df -> assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1));
     }
 
-    /** Inflight/Orphan File-groups needs to be in the view **/
+    // Inflight/Orphan File-groups needs to be in the view
 
     // There is a data-file with this inflight file-id
     final String inflightFileId1 = UUID.randomUUID().toString();
@@ -507,7 +507,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
     assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName());
     assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName());
 
-    /** Data Files API tests */
+    // Data Files API tests
     dataFiles = roView.getLatestBaseFiles().collect(Collectors.toList());
     assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
     dataFiles.forEach(df -> assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), compactionRequestedTime));
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java
index 76d7e06..2cc726e 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestDiskBasedMap.java
@@ -167,7 +167,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness {
     schema = SchemaTestUtil.getSimpleSchema();
     List<IndexedRecord> indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
     hoodieRecords =
-        indexedRecords.stream().map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
+        indexedRecords.stream().map(r -> new HoodieRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
             new AvroBinaryTestPayload(Option.of((GenericRecord) r)))).collect(Collectors.toList());
     payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), new HoodieRecordSizeEstimator(schema));
     assertTrue(payloadSize > 0);
@@ -176,7 +176,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness {
     final Schema simpleSchemaWithMetadata = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
     indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
     hoodieRecords = indexedRecords.stream()
-        .map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
+        .map(r -> new HoodieRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
             new AvroBinaryTestPayload(
                 Option.of(HoodieAvroUtils.rewriteRecord((GenericRecord) r, simpleSchemaWithMetadata)))))
         .collect(Collectors.toList());
@@ -193,7 +193,7 @@ public class TestDiskBasedMap extends HoodieCommonTestHarness {
     // Test sizeEstimatorPerformance with simpleSchema
     Schema schema = SchemaTestUtil.getSimpleSchema();
     List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
-    HoodieRecordSizeEstimator sizeEstimator = new HoodieRecordSizeEstimator(schema);
+    HoodieRecordSizeEstimator sizeEstimator = new HoodieRecordSizeEstimator<>(schema);
     HoodieRecord record = hoodieRecords.remove(0);
     long startTime = System.currentTimeMillis();
     SpillableMapUtils.computePayloadSize(record, sizeEstimator);
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index 506b6cf..0c3f141 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -333,8 +333,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
       if (o instanceof CombinePathInputFormat) {
         CombinePathInputFormat mObj = (CombinePathInputFormat) o;
         return (opList.equals(mObj.opList)) && (inputFormatClassName.equals(mObj.inputFormatClassName))
-            && (deserializerClassName == null ? (mObj.deserializerClassName == null)
-                : deserializerClassName.equals(mObj.deserializerClassName));
+            && (Objects.equals(deserializerClassName, mObj.deserializerClassName));
       }
       return false;
     }
@@ -353,16 +352,16 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     init(job);
     Map<Path, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
     Map<String, Operator<? extends OperatorDesc>> aliasToWork = mrwork.getAliasToWork();
-    /** MOD - Initialize a custom combine input format shim that will call listStatus on the custom inputFormat **/
+    /* MOD - Initialize a custom combine input format shim that will call listStatus on the custom inputFormat **/
     HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim combine =
-        new HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim();
+        new HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim<>();
 
     InputSplit[] splits;
 
     if (combine.getInputPathsShim(job).length == 0) {
       throw new IOException("No input paths specified in job");
     }
-    ArrayList<InputSplit> result = new ArrayList<>();
+    List<InputSplit> result = new ArrayList<>();
 
     // combine splits only from same tables and same partitions. Do not combine splits from multiple
     // tables or multiple partitions.
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 89b7168..0586bc4 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -210,7 +210,7 @@ public class TestHoodieRealtimeRecordReader {
             action.equals(HoodieTimeline.ROLLBACK_ACTION) ? String.valueOf(baseInstantTs + logVersion - 2)
                 : instantTime;
 
-        HoodieLogFormat.Writer writer = null;
+        HoodieLogFormat.Writer writer;
         if (action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
           writer = writeRollback(partitionDir, schema, "fileid0", baseInstant, instantTime,
               String.valueOf(baseInstantTs + logVersion - 1), logVersion);
@@ -317,7 +317,7 @@ public class TestHoodieRealtimeRecordReader {
         numRecordsAtCommit2++;
         Assert.assertTrue(gotKey > firstBatchLastRecordKey);
         Assert.assertTrue(gotKey <= secondBatchLastRecordKey);
-        assertEquals((int) gotKey, lastSeenKeyFromLog + 1);
+        assertEquals(gotKey, lastSeenKeyFromLog + 1);
         lastSeenKeyFromLog++;
       } else {
         numRecordsAtCommit1++;
@@ -491,7 +491,6 @@ public class TestHoodieRealtimeRecordReader {
     writer = writeRollbackBlockToLogFile(partitionDir, schema, "fileid0", commitTime, newCommitTime, "101", 1);
     logFilePaths.add(writer.getLogFile().getPath().toString());
     writer.close();
-    assertTrue("block - size should be > 0", size > 0);
     InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
 
     // create a split with baseFile (parquet file written earlier) and new log file(s)
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
index d85778a..6ca9957 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.hive.util;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -26,11 +30,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncException;
 import org.apache.hudi.hive.SchemaDifference;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroSchemaConverter;
@@ -46,7 +45,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * Schema Utilities.
@@ -367,10 +365,9 @@ public class SchemaUtil {
       return true;
     } else if (prevType.equalsIgnoreCase("float") && newType.equalsIgnoreCase("double")) {
       return true;
-    } else if (prevType.contains("struct") && newType.toLowerCase().contains("struct")) {
-      return true;
+    } else {
+      return prevType.contains("struct") && newType.toLowerCase().contains("struct");
     }
-    return false;
   }
 
   public static String generateSchemaString(MessageType storageSchema) throws IOException {
@@ -403,18 +400,17 @@ public class SchemaUtil {
           .append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString());
     }
 
-    String partitionsStr = partitionFields.stream().collect(Collectors.joining(","));
+    String partitionsStr = String.join(",", partitionFields);
     StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE  IF NOT EXISTS ");
-    sb = sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
+    sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
             .append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
-    sb = sb.append("( ").append(columns).append(")");
+    sb.append("( ").append(columns).append(")");
     if (!config.partitionFields.isEmpty()) {
-      sb = sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");
+      sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");
     }
-    sb = sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
-    sb = sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
-    sb = sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.basePath)
-        .append("'");
+    sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
+    sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
+    sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.basePath).append("'");
     return sb.toString();
   }
 
@@ -433,7 +429,6 @@ public class SchemaUtil {
    * 
    * @return
    */
-  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
   public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
     Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
     HoodieAvroDataBlock lastBlock = null;
diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 50bb0e5..49692f5 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -38,7 +38,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -47,7 +46,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-@SuppressWarnings("ConstantConditions")
 @RunWith(Parameterized.class)
 public class TestHiveSyncTool {
 
@@ -64,7 +62,7 @@ public class TestHiveSyncTool {
   }
 
   @Before
-  public void setUp() throws IOException, InterruptedException, URISyntaxException {
+  public void setUp() throws IOException, InterruptedException {
     TestUtil.setUp();
   }
 
diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
index d82c33b..fc7675f 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
@@ -265,11 +265,11 @@ public class HiveTestService {
             ? new ChainedTTransportFactory(new TFramedTransport.Factory(), new TUGIContainingTransport.Factory())
             : new TUGIContainingTransport.Factory();
 
-        processor = new TUGIBasedProcessor<IHMSHandler>(handler);
+        processor = new TUGIBasedProcessor<>(handler);
         LOG.info("Starting DB backed MetaStore Server with SetUGI enabled");
       } else {
         transFactory = useFramedTransport ? new TFramedTransport.Factory() : new TTransportFactory();
-        processor = new TSetIpAddressProcessor<IHMSHandler>(handler);
+        processor = new TSetIpAddressProcessor<>(handler);
         LOG.info("Starting DB backed MetaStore Server");
       }
 
@@ -278,12 +278,7 @@ public class HiveTestService {
           .minWorkerThreads(minWorkerThreads).maxWorkerThreads(maxWorkerThreads);
 
       final TServer tServer = new TThreadPoolServer(args);
-      executorService.submit(new Runnable() {
-        @Override
-        public void run() {
-          tServer.serve();
-        }
-      });
+      executorService.submit(tServer::serve);
       return tServer;
     } catch (Throwable x) {
       throw new IOException(x);
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index 7d11414..f61028e 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -33,35 +33,35 @@ import java.util.List;
  */
 public class ITTestHoodieDemo extends ITTestBase {
 
-  private static String HDFS_DATA_DIR = "/usr/hive/data/input";
-  private static String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/batch_1.json";
-  private static String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/batch_2.json";
-  private static String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/presto-table-check.commands";
-  private static String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/presto-batch1.commands";
-  private static String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/presto-batch2-after-compaction.commands";
-
-  private static String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json";
-  private static String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands";
-  private static String PRESTO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/presto-batch1.commands";
-  private static String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT + "/docker/demo/data/batch_2.json";
-  private static String PRESTO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/presto-batch2-after-compaction.commands";
-
-  private static String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow";
-  private static String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor";
-  private static String COW_TABLE_NAME = "stock_ticks_cow";
-  private static String MOR_TABLE_NAME = "stock_ticks_mor";
-
-  private static String DEMO_CONTAINER_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/setup_demo_container.sh";
-  private static String MIN_COMMIT_TIME_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time.sh";
-  private static String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh";
-  private static String COMPACTION_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/compaction.commands";
-  private static String SPARKSQL_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch1.commands";
-  private static String SPARKSQL_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch2.commands";
-  private static String SPARKSQL_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-incremental.commands";
-  private static String HIVE_TBLCHECK_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-table-check.commands";
-  private static String HIVE_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch1.commands";
-  private static String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands";
-  private static String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands";
+  private static final String HDFS_DATA_DIR = "/usr/hive/data/input";
+  private static final String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/batch_1.json";
+  private static final String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/batch_2.json";
+  private static final String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/presto-table-check.commands";
+  private static final String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/presto-batch1.commands";
+  private static final String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/presto-batch2-after-compaction.commands";
+
+  private static final String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json";
+  private static final String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands";
+  private static final String PRESTO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/presto-batch1.commands";
+  private static final String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT + "/docker/demo/data/batch_2.json";
+  private static final String PRESTO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/presto-batch2-after-compaction.commands";
+
+  private static final String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow";
+  private static final String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor";
+  private static final String COW_TABLE_NAME = "stock_ticks_cow";
+  private static final String MOR_TABLE_NAME = "stock_ticks_mor";
+
+  private static final String DEMO_CONTAINER_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/setup_demo_container.sh";
+  private static final String MIN_COMMIT_TIME_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time.sh";
+  private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh";
+  private static final String COMPACTION_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/compaction.commands";
+  private static final String SPARKSQL_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch1.commands";
+  private static final String SPARKSQL_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch2.commands";
+  private static final String SPARKSQL_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-incremental.commands";
+  private static final String HIVE_TBLCHECK_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-table-check.commands";
+  private static final String HIVE_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch1.commands";
+  private static final String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands";
+  private static final String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands";
 
   private static String HIVE_SYNC_CMD_FMT =
       " --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index 2cfc914..aaddee7 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -260,14 +260,13 @@ public class HDFSParquetImporter implements Serializable {
     public int parallelism = 1;
     @Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true)
     public String schemaFile = null;
-    @Parameter(names = {"--format", "-f"}, description = "Format for the input data.", required = false,
-        validateValueWith = FormatValidator.class)
+    @Parameter(names = {"--format", "-f"}, description = "Format for the input data.", validateValueWith = FormatValidator.class)
     public String format = null;
-    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
     public String sparkMaster = null;
     @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
     public String sparkMemory = null;
-    @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
     public int retry = 0;
     @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
         + "hoodie client for importing")
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index 7fa0da5..e9a5f80 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -114,7 +114,7 @@ public class TimelineServerPerf implements Serializable {
     d2.close();
 
     System.out.println("\n\n\nDumping all File Slices");
-    selected.stream().forEach(p -> fsView.getAllFileSlices(p).forEach(s -> System.out.println("\tMyFileSlice=" + s)));
+    selected.forEach(p -> fsView.getAllFileSlices(p).forEach(s -> System.out.println("\tMyFileSlice=" + s)));
 
     // Waiting for curl queries
     if (!useExternalTimelineServer && cfg.waitForManualQueries) {
@@ -131,17 +131,16 @@ public class TimelineServerPerf implements Serializable {
 
   public List<PerfStats> runLookups(JavaSparkContext jsc, List<String> partitionPaths, SyncableFileSystemView fsView,
       int numIterations, int concurrency) {
-    List<PerfStats> perfStats = jsc.parallelize(partitionPaths, cfg.numExecutors).flatMap(p -> {
+    return jsc.parallelize(partitionPaths, cfg.numExecutors).flatMap(p -> {
       ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(100);
       final List<PerfStats> result = new ArrayList<>();
       final List<ScheduledFuture<PerfStats>> futures = new ArrayList<>();
       List<FileSlice> slices = fsView.getLatestFileSlices(p).collect(Collectors.toList());
       String fileId = slices.isEmpty() ? "dummyId"
           : slices.get(new Random(Double.doubleToLongBits(Math.random())).nextInt(slices.size())).getFileId();
-      IntStream.range(0, concurrency).forEach(i -> {
-        futures.add(executor.schedule(() -> runOneRound(fsView, p, fileId, i, numIterations), 0, TimeUnit.NANOSECONDS));
-      });
-      futures.stream().forEach(x -> {
+      IntStream.range(0, concurrency).forEach(i -> futures.add(executor.schedule(() -> runOneRound(fsView, p, fileId,
+              i, numIterations), 0, TimeUnit.NANOSECONDS)));
+      futures.forEach(x -> {
         try {
           result.add(x.get());
         } catch (InterruptedException | ExecutionException e) {
@@ -149,12 +148,9 @@ public class TimelineServerPerf implements Serializable {
         }
       });
       System.out.println("SLICES are=");
-      slices.stream().forEach(s -> {
-        System.out.println("\t\tFileSlice=" + s);
-      });
+      slices.forEach(s -> System.out.println("\t\tFileSlice=" + s));
       return result.iterator();
     }).collect();
-    return perfStats;
   }
 
   private static PerfStats runOneRound(SyncableFileSystemView fsView, String partition, String fileId, int id,
@@ -194,7 +190,7 @@ public class TimelineServerPerf implements Serializable {
     }
 
     public void dump(List<PerfStats> stats) {
-      stats.stream().forEach(x -> {
+      stats.forEach(x -> {
         String row = String.format("%s,%d,%d,%d,%f,%f,%f,%f\n", x.partition, x.id, x.minTime, x.maxTime, x.meanTime,
             x.medianTime, x.p75, x.p95);
         System.out.println(row);
@@ -260,7 +256,7 @@ public class TimelineServerPerf implements Serializable {
     @Parameter(names = {"--num-iterations", "-i"}, description = "Number of iterations for each partitions")
     public Integer numIterations = 10;
 
-    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
     public String sparkMaster = "local[2]";
 
     @Parameter(names = {"--server-port", "-p"}, description = " Server Port")
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index 54ea0f3..9787bab 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -38,7 +38,7 @@ public class IncrSourceHelper {
   private static String getStrictlyLowerTimestamp(String timestamp) {
     long ts = Long.parseLong(timestamp);
     Preconditions.checkArgument(ts > 0, "Timestamp must be positive");
-    Long lower = ts - 1;
+    long lower = ts - 1;
     return "" + lower;
   }
 
@@ -73,7 +73,7 @@ public class IncrSourceHelper {
 
     Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline
         .findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y));
-    return Pair.of(beginInstantTime, nthInstant.map(instant -> instant.getTimestamp()).orElse(beginInstantTime));
+    return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime));
   }
 
   /**
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index a92a441..4ad8855 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -94,8 +94,7 @@ public class KafkaOffsetGen {
 
       // Create initial offset ranges for each 'to' partition, with from = to offsets.
       OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
-      toOffsetMap.entrySet().stream().map(e -> {
-        TopicPartition tp = e.getKey();
+      toOffsetMap.keySet().stream().map(tp -> {
         long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
         return OffsetRange.create(tp, fromOffset, fromOffset);
       }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
@@ -208,9 +207,7 @@ public class KafkaOffsetGen {
     maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE)
         ? Config.maxEventsFromKafkaSource : maxEventsToReadFromKafka;
     long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit;
-    OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
-
-    return offsetRanges;
+    return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);
   }
 
   // check up checkpoint offsets is valid or not, if true, return checkpoint offsets,


Mime
View raw message