hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vin...@apache.org
Subject [incubator-hudi] branch master updated: [HUDI-499] Allow update partition path with GLOBAL_BLOOM (#1187)
Date Wed, 05 Feb 2020 17:33:43 GMT
This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c1516df  [HUDI-499] Allow update partition path with GLOBAL_BLOOM (#1187)
c1516df is described below

commit c1516df8ac55757ebd07d8aa459a0ceedeccab7b
Author: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
AuthorDate: Wed Feb 5 09:33:33 2020 -0800

    [HUDI-499] Allow update partition path with GLOBAL_BLOOM (#1187)
    
    * Handle partition path update by deleting a record from the old partition and
      insert into the new one
    * Add a new configuration "hoodie.bloom.index.update.partition.path" to
      enable the behavior
    * Add a new unit test case for global bloom index
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  | 18 +++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 +
 .../hudi/index/bloom/HoodieGlobalBloomIndex.java   | 23 +++++-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    | 89 ++++++++++++++++++++++
 4 files changed, 131 insertions(+), 3 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index d39fae1..db83498 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -77,6 +77,17 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
   public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage.level";
   public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
 
+  /**
+   * Only applies if index type is GLOBAL_BLOOM.
+   * <p>
+   * When set to true, an update to a record with a different partition from its existing
one
+   * will insert the record to the new partition and delete it from the old partition.
+   * <p>
+   * When set to false, a record will be updated to the old partition.
+   */
+  public static final String BLOOM_INDEX_UPDATE_PARTITION_PATH = "hoodie.bloom.index.update.partition.path";
+  public static final String DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH = "false";
+
   private HoodieIndexConfig(Properties props) {
     super(props);
   }
@@ -176,6 +187,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withBloomIndexUpdatePartitionPath(boolean updatePartitionPath) {
+      props.setProperty(BLOOM_INDEX_UPDATE_PARTITION_PATH, String.valueOf(updatePartitionPath));
+      return this;
+    }
+
     public HoodieIndexConfig build() {
       HoodieIndexConfig config = new HoodieIndexConfig(props);
       setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP,
DEFAULT_INDEX_TYPE);
@@ -190,6 +206,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
           DEFAULT_BLOOM_INDEX_USE_CACHING);
       setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL), BLOOM_INDEX_INPUT_STORAGE_LEVEL,
           DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL);
+      setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_UPDATE_PARTITION_PATH),
+          BLOOM_INDEX_UPDATE_PARTITION_PATH, DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH);
       setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_TREE_BASED_FILTER_PROP),
           BLOOM_INDEX_TREE_BASED_FILTER_PROP, DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER);
       setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_BUCKETIZED_CHECKING_PROP),
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7fc0680..642384b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -431,6 +431,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return StorageLevel.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL));
   }
 
+  public boolean getBloomIndexUpdatePartitionPath() {
+    return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH));
+  }
+
   /**
    * storage properties.
    */
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index be6f524..ba8976b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.index.bloom;
 
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -36,6 +37,8 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.Optional;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -114,14 +117,28 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload>
extends Hoodi
         keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new
Tuple2<>(p._2, p._1)));
 
     // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is
null), so we do left outer join.
-    return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record
-> {
+    return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().flatMap(record
-> {
       final HoodieRecord<T> hoodieRecord = record._1;
       final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair
= record._2;
       if (recordLocationHoodieKeyPair.isPresent()) {
         // Record key matched to file
-        return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2,
hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
+        if (config.getBloomIndexUpdatePartitionPath()
+            && !recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath()))
{
+          // Create an empty record to delete the record in the old partition
+          HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
+              new EmptyHoodieRecordPayload());
+          // Tag the incoming record for inserting to the new partition
+          HoodieRecord<T> taggedRecord = getTaggedRecord(hoodieRecord, Option.empty());
+          return Arrays.asList(emptyRecord, taggedRecord).iterator();
+        } else {
+          // Ignore the incoming record's partition, regardless of whether it differs from
its old partition or not.
+          // When it differs, the record will still be updated at its old partition.
+          return Collections.singletonList(
+              getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2,
hoodieRecord.getData()),
+                  Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator();
+        }
       } else {
-        return getTaggedRecord(hoodieRecord, Option.empty());
+        return Collections.singletonList(getTaggedRecord(hoodieRecord, Option.empty())).iterator();
       }
     });
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index c605654..5e4e21b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -21,6 +21,7 @@ package org.apache.hudi.index.bloom;
 import org.apache.hudi.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.TestRawTripPayload;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieAvroUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
@@ -56,6 +58,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
 
@@ -297,6 +300,92 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness
{
     }
   }
 
+  @Test
+  public void testTagLocationWhenShouldUpdatePartitionPath() throws Exception {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(true).build())
+        .build();
+    HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
+
+    // Create the original partition, and put a record, along with the meta file
+    // "2016/01/31": 1 file (1_0_20160131101010.parquet)
+    new File(basePath + "/2016/01/31").mkdirs();
+    new File(basePath + "/2016/01/31/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
+
+    // this record will be saved in table and will be tagged to an empty record
+    TestRawTripPayload originalPayload =
+        new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
+    HoodieRecord originalRecord =
+        new HoodieRecord(new HoodieKey(originalPayload.getRowKey(), originalPayload.getPartitionPath()),
+            originalPayload);
+
+    /*
+    This record has the same record key as originalRecord but different time so different
partition
+    Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
+    globalBloomIndex should
+     - tag the original partition of the originalRecord to an empty record for deletion,
and
+     - tag the new partition of the incomingRecord
+    */
+    TestRawTripPayload incomingPayload =
+        new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
+    HoodieRecord incomingRecord =
+        new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), incomingPayload.getPartitionPath()),
+            incomingPayload);
+
+    /*
+    This record has the same record key as originalRecord and the same partition
+    Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
+    globalBloomIndex should just tag the original partition
+    */
+    TestRawTripPayload incomingPayloadSamePartition =
+        new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}");
+    HoodieRecord incomingRecordSamePartition =
+        new HoodieRecord(
+            new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()),
+            incomingPayloadSamePartition);
+
+    HoodieClientTestUtils
+        .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord),
schema, null, false);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+
+    // Add some commits
+    new File(basePath + "/.hoodie").mkdirs();
+
+    // test against incoming record with a different partition
+    JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord));
+    JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table);
+
+    assertEquals(2, taggedRecordRDD.count());
+    for (HoodieRecord record : taggedRecordRDD.collect()) {
+      switch (record.getPartitionPath()) {
+        case "2016/01/31":
+          assertEquals("000", record.getRecordKey());
+          assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
+          break;
+        case "2016/02/31":
+          assertEquals("000", record.getRecordKey());
+          assertEquals(incomingPayload.getJsonData(), ((TestRawTripPayload) record.getData()).getJsonData());
+          break;
+        default:
+          fail(String.format("Should not get partition path: %s", record.getPartitionPath()));
+      }
+    }
+
+    // test against incoming record with the same partition
+    JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
+        .parallelize(Collections.singletonList(incomingRecordSamePartition));
+    JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition,
jsc, table);
+
+    assertEquals(1, taggedRecordRDDSamePartition.count());
+    HoodieRecord record = taggedRecordRDDSamePartition.first();
+    assertEquals("000", record.getRecordKey());
+    assertEquals("2016/01/31", record.getPartitionPath());
+    assertEquals(incomingPayloadSamePartition.getJsonData(), ((TestRawTripPayload) record.getData()).getJsonData());
+  }
+
   // convert list to map to avoid sorting order dependencies
   private Map<String, BloomIndexFileInfo> toFileMap(List<Tuple2<String, BloomIndexFileInfo>>
filesList) {
     Map<String, BloomIndexFileInfo> filesMap = new HashMap<>();


Mime
View raw message