hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vin...@apache.org
Subject [incubator-hudi] branch master updated: [HUDI-671] Added unit-test for HBaseIndex (#1381)
Date Sun, 08 Mar 2020 00:48:53 GMT
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f8bf97  [HUDI-671] Added unit-test for HBaseIndex (#1381)
5f8bf97 is described below

commit 5f8bf970058d5915af1910dbf917a33356d06af2
Author: Prashant Wason <pwason@uber.com>
AuthorDate: Sat Mar 7 16:48:43 2020 -0800

    [HUDI-671] Added unit-test for HBaseIndex (#1381)
---
 .../java/org/apache/hudi/index/TestHbaseIndex.java | 133 ++++++++++++++++++++-
 1 file changed, 129 insertions(+), 4 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 2893947..53adf6c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -22,9 +22,11 @@ import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieHBaseIndexConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -38,6 +40,7 @@ import org.apache.hudi.table.HoodieTable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
@@ -57,6 +60,7 @@ import org.junit.Test;
 import org.junit.runners.MethodSorters;
 import org.mockito.Mockito;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -65,6 +69,7 @@ import scala.Tuple2;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.times;
@@ -93,7 +98,10 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   @BeforeClass
   public static void init() throws Exception {
     // Initialize HbaseMiniCluster
-    utility = new HBaseTestingUtility();
+    hbaseConfig = HBaseConfiguration.create();
+    hbaseConfig.set("zookeeper.znode.parent", "/hudi-hbase-test");
+
+    utility = new HBaseTestingUtility(hbaseConfig);
     utility.startMiniCluster();
     hbaseConfig = utility.getConnection().getConfiguration();
     utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s"));
@@ -389,6 +397,117 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
         hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(),
100), 0.0f);
   }
 
+  @Test
+  public void testSmallBatchSize() throws Exception {
+    String newCommitTime = "001";
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+    // Load to memory
+    HoodieWriteConfig config = getConfig(2);
+    HBaseIndex index = new HBaseIndex(config);
+    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+
+      // Test tagLocation without any entries in index
+      JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()
== 0);
+
+      // Insert 200 records
+      writeClient.startCommitWithTime(newCommitTime);
+      JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
+      assertNoWriteErrors(writeStatues.collect());
+
+      // Now tagLocation for these records, hbaseIndex should not tag them since it was a
failed
+      // commit
+      javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()
== 0);
+
+      // Now commit this & update location of records inserted and validate no errors
+      writeClient.commit(newCommitTime, writeStatues);
+      // Now tagLocation for these records, hbaseIndex should tag them correctly
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
+      assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
+      assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null
+          && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
+    }
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    String newCommitTime = "001";
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+    // Load to memory
+    HoodieWriteConfig config = getConfig();
+    HBaseIndex index = new HBaseIndex(config);
+    try (HoodieWriteClient writeClient = getWriteClient(config);) {
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+
+      // Test tagLocation without any entries in index
+      JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()
== 0);
+
+      // Insert records
+      writeClient.startCommitWithTime(newCommitTime);
+      JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
+      assertNoWriteErrors(writeStatues.collect());
+      writeClient.commit(newCommitTime, writeStatues);
+
+      // Now tagLocation for these records, hbaseIndex should tag them correctly
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
+      assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
+      assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null
+          && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
+
+      // Delete all records. This has to be done directly as deleting index entries
+      // is not implemented via HoodieWriteClient
+      Option recordMetadata = Option.empty();
+      JavaRDD<WriteStatus> deleteWriteStatues = writeStatues.map(w -> {
+        WriteStatus newWriteStatus = new WriteStatus(true, 1.0);
+        w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(),
null), recordMetadata));
+        assertEquals(w.getTotalRecords(), newWriteStatus.getTotalRecords());
+        newWriteStatus.setStat(new HoodieWriteStat());
+        return newWriteStatus;
+      });
+      JavaRDD<WriteStatus> deleteStatus = index.updateLocation(deleteWriteStatues,
jsc, hoodieTable);
+      assertEquals(deleteStatus.count(), deleteWriteStatues.count());
+      assertNoWriteErrors(deleteStatus.collect());
+
+      // Ensure no records can be tagged
+      javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+      assertEquals(0, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
+      assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
+      assertEquals(0, javaRDD.filter(record -> (record.getCurrentLocation() != null
+          && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
+    }
+  }
+
+  @Test
+  public void testFeatureSupport() throws Exception {
+    HoodieWriteConfig config = getConfig();
+    HBaseIndex index = new HBaseIndex(config);
+
+    assertTrue(index.canIndexLogFiles());
+    try {
+      HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      index.fetchRecordLocation(jsc.parallelize(new ArrayList<HoodieKey>(), 1), jsc,
hoodieTable);
+      fail("HbaseIndex supports fetchRecordLocation");
+    } catch (UnsupportedOperationException ex) {
+      // Expected so ignore
+      ex.getStackTrace();
+    }
+  }
+
   private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites)
{
     final WriteStatus writeStatus = new WriteStatus(false, 0.1);
     HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
@@ -406,10 +525,14 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
   }
 
   private HoodieWriteConfig getConfig() {
-    return getConfigBuilder().build();
+    return getConfigBuilder(100).build();
+  }
+
+  private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) {
+    return getConfigBuilder(hbaseIndexBatchSize).build();
   }
 
-  private HoodieWriteConfig.Builder getConfigBuilder() {
+  private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(1, 1)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
* 1024)
@@ -419,8 +542,10 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
             .withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder()
                 .hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
+                .hbaseIndexPutBatchSizeAutoCompute(true)
+                .hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", ""))
                 .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
-                .hbaseIndexGetBatchSize(100).build())
+                .hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
             .build());
   }
 }


Mime
View raw message