hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vin...@apache.org
Subject [incubator-hudi] branch master updated: [HUDI-652] Decouple HoodieReadClient and AbstractHoodieClient to break the inheritance chain (#1372)
Date Fri, 06 Mar 2020 17:59:42 GMT
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5b32f  [HUDI-652] Decouple HoodieReadClient and AbstractHoodieClient to break the
inheritance chain (#1372)
ee5b32f is described below

commit ee5b32f5d4aa26e7fc58ccdae46935f063460920
Author: vinoyang <yanghua1127@gmail.com>
AuthorDate: Sat Mar 7 01:59:35 2020 +0800

    [HUDI-652] Decouple HoodieReadClient and AbstractHoodieClient to break the inheritance
chain (#1372)
    
    
    * Removed timeline server support
    * Removed try-with-resource
---
 .../org/apache/hudi/client/HoodieReadClient.java   |  9 ++-
 .../apache/hudi/client/TestHoodieReadClient.java   | 63 ++++++++---------
 .../apache/hudi/table/TestMergeOnReadTable.java    | 82 +++++++++++-----------
 .../hudi/table/compact/TestAsyncCompaction.java    | 25 +++----
 .../main/java/org/apache/hudi/DataSourceUtils.java |  3 +-
 5 files changed, 88 insertions(+), 94 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index e08ec34..33d661b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -46,6 +45,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.types.StructType;
 
+import java.io.Serializable;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -56,7 +56,7 @@ import scala.Tuple2;
 /**
  * Provides an RDD based API for accessing/filtering Hoodie tables, based on keys.
  */
-public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoodieClient
{
+public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializable
{
 
   private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class);
 
@@ -65,9 +65,9 @@ public class HoodieReadClient<T extends HoodieRecordPayload> extends
AbstractHoo
    * basepath pointing to the table. Until, then just always assume a BloomIndex
    */
   private final transient HoodieIndex<T> index;
-  private final HoodieTimeline commitTimeline;
   private HoodieTable hoodieTable;
   private transient Option<SQLContext> sqlContextOpt;
+  private final transient JavaSparkContext jsc;
 
   /**
    * @param basePath path to Hoodie table
@@ -108,12 +108,11 @@ public class HoodieReadClient<T extends HoodieRecordPayload> extends
AbstractHoo
    */
   public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
       Option<EmbeddedTimelineService> timelineService) {
-    super(jsc, clientConfig, timelineService);
+    this.jsc = jsc;
     final String basePath = clientConfig.getBasePath();
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath, true);
     this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, jsc);
-    this.commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
     this.index = HoodieIndex.createIndex(clientConfig, jsc);
     this.sqlContextOpt = Option.empty();
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
index c57da14..6329e08 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
@@ -96,8 +96,8 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
    */
   private void testReadFilterExist(HoodieWriteConfig config,
       Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>,
String> writeFn) throws Exception {
-    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);
-        HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
+      HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());
       String newCommitTime = writeClient.startCommit();
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
@@ -113,37 +113,36 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
       // Verify there are no errors
       assertNoWriteErrors(statuses);
 
-      try (HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath());)
{
-        filteredRDD = anotherReadClient.filterExists(recordsRDD);
-        List<HoodieRecord> result = filteredRDD.collect();
-        // Check results
-        assertEquals(25, result.size());
-
-        // check path exists for written keys
-        JavaPairRDD<HoodieKey, Option<String>> keyToPathPair =
-                anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey()));
-        JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> keyPath._2.isPresent())
-                .map(keyPath -> keyPath._1);
-        assertEquals(75, keysWithPaths.count());
-
-        // verify rows match inserted records
-        Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
-        assertEquals(75, rows.count());
-
-        JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent())
-                .map(keyPath -> keyPath._1);
-
-        try {
-          anotherReadClient.readROView(keysWithoutPaths, 1);
-        } catch (Exception e) {
-          // data frame reader throws exception for empty records. ignore the error.
-          assertEquals(e.getClass(), AnalysisException.class);
-        }
-
-        // Actual tests of getPendingCompactions method are in TestAsyncCompaction
-        // This is just testing empty list
-        assertEquals(0, anotherReadClient.getPendingCompactions().size());
+      HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath());
+      filteredRDD = anotherReadClient.filterExists(recordsRDD);
+      List<HoodieRecord> result = filteredRDD.collect();
+      // Check results
+      assertEquals(25, result.size());
+
+      // check path exists for written keys
+      JavaPairRDD<HoodieKey, Option<String>> keyToPathPair =
+              anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey()));
+      JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> keyPath._2.isPresent())
+              .map(keyPath -> keyPath._1);
+      assertEquals(75, keysWithPaths.count());
+
+      // verify rows match inserted records
+      Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
+      assertEquals(75, rows.count());
+
+      JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent())
+              .map(keyPath -> keyPath._1);
+
+      try {
+        anotherReadClient.readROView(keysWithoutPaths, 1);
+      } catch (Exception e) {
+        // data frame reader throws exception for empty records. ignore the error.
+        assertEquals(e.getClass(), AnalysisException.class);
       }
+
+      // Actual tests of getPendingCompactions method are in TestAsyncCompaction
+      // This is just testing empty list
+      assertEquals(0, anotherReadClient.getPendingCompactions().size());
     }
   }
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index ab27920..740caf2 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -759,54 +759,54 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
 
       List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
       JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
-      try (HoodieReadClient readClient = new HoodieReadClient(jsc, config);) {
-        updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
 
-        // Write them to corresponding avro logfiles
-        HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(),
-            HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
+      HoodieReadClient readClient = new HoodieReadClient(jsc, config);
+      updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
 
-        // Verify that all data file has one log file
-        metaClient = HoodieTableMetaClient.reload(metaClient);
-        HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-        // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system
view state
-        ((SyncableFileSystemView) (table.getSliceView())).reset();
-
-        for (String partitionPath : dataGen.getPartitionPaths()) {
-          List<FileSlice> groupedLogFiles =
-              table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
-          for (FileSlice fileSlice : groupedLogFiles) {
-            assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count());
-          }
+      // Write them to corresponding avro logfiles
+      HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(),
+          HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
+
+      // Verify that all data file has one log file
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system
view state
+      ((SyncableFileSystemView) (table.getSliceView())).reset();
+
+      for (String partitionPath : dataGen.getPartitionPaths()) {
+        List<FileSlice> groupedLogFiles =
+            table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+        for (FileSlice fileSlice : groupedLogFiles) {
+          assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count());
         }
+      }
 
-        // Mark 2nd delta-instant as completed
-        metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT,
-            HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
-        metaClient.getActiveTimeline().saveAsComplete(
-            new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime),
Option.empty());
+      // Mark 2nd delta-instant as completed
+      metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT,
+          HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
+      metaClient.getActiveTimeline().saveAsComplete(
+          new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime),
Option.empty());
 
-        // Do a compaction
-        String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
-        JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime);
+      // Do a compaction
+      String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
+      JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime);
 
-        // Verify that recently written compacted data file has no log file
-        metaClient = HoodieTableMetaClient.reload(metaClient);
-        table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-        HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
-
-        assertTrue("Compaction commit should be > than last insert", HoodieTimeline
-            .compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime,
HoodieTimeline.GREATER));
-
-        for (String partitionPath : dataGen.getPartitionPaths()) {
-          List<FileSlice> groupedLogFiles =
-              table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
-          for (FileSlice slice : groupedLogFiles) {
-            assertEquals("After compaction there should be no log files visible on a full
view", 0, slice.getLogFiles().count());
-          }
-          List<WriteStatus> writeStatuses = result.collect();
-          assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
+      // Verify that recently written compacted data file has no log file
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+
+      assertTrue("Compaction commit should be > than last insert", HoodieTimeline
+          .compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime,
HoodieTimeline.GREATER));
+
+      for (String partitionPath : dataGen.getPartitionPaths()) {
+        List<FileSlice> groupedLogFiles =
+            table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+        for (FileSlice slice : groupedLogFiles) {
+          assertEquals("After compaction there should be no log files visible on a full view",
0, slice.getLogFiles().count());
         }
+        List<WriteStatus> writeStatuses = result.collect();
+        assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
       }
     }
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
index e81fa99..1a366a4 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
@@ -92,9 +92,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
   public void testRollbackForInflightCompaction() throws Exception {
     // Rollback inflight compaction
     HoodieWriteConfig cfg = getConfig(false);
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
-         HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) {
-
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       String firstInstantTime = "001";
       String secondInstantTime = "004";
       String compactionInstantTime = "005";
@@ -155,9 +154,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
 
     int numRecs = 2000;
 
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
-         HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) {
-
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
       records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime),
records, cfg, true,
           new ArrayList<>());
@@ -197,9 +195,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
   public void testInflightCompaction() throws Exception {
     // There is inflight compaction. Subsequent compaction run must work correctly
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
-         HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) {
-
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       String firstInstantTime = "001";
       String secondInstantTime = "004";
       String compactionInstantTime = "005";
@@ -351,9 +348,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
   public void testCompactionAfterTwoDeltaCommits() throws Exception {
     // No Delta Commits after compaction request
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
-         HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) {
-
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       String firstInstantTime = "001";
       String secondInstantTime = "004";
       String compactionInstantTime = "005";
@@ -373,9 +369,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
   public void testInterleavedCompaction() throws Exception {
     // Case: Two delta commits before and after compaction schedule
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
-         HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) {
-
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       String firstInstantTime = "001";
       String secondInstantTime = "004";
       String compactionInstantTime = "005";
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index a2dfe02..6a4ad03 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -223,7 +223,8 @@ public class DataSourceUtils {
   @SuppressWarnings("unchecked")
   public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord>
incomingHoodieRecords,
                                                      HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService>
timelineService) {
-    try (HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService))
{
+    try {
+      HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService);
       return client.tagLocation(incomingHoodieRecords)
           .filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
     } catch (TableNotFoundException e) {


Mime
View raw message