hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vbal...@apache.org
Subject [incubator-hudi] branch master updated: [HUDI-667] Fixing delete tests for DeltaStreamer (#1395)
Date Wed, 11 Mar 2020 23:19:35 GMT
This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ca912a  [HUDI-667] Fixing delete tests for DeltaStreamer (#1395)
1ca912a is described below

commit 1ca912af0904283a270a822d5876babca5c89739
Author: Sivabalan Narayanan <sivabala@uber.com>
AuthorDate: Wed Mar 11 16:19:23 2020 -0700

    [HUDI-667] Fixing delete tests for DeltaStreamer (#1395)
---
 .../hudi/common/HoodieTestDataGenerator.java       | 42 +++++++++++-----------
 .../hudi/utilities/TestHoodieDeltaStreamer.java    |  4 +--
 2 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index e0d2a53..6d86e93 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -399,7 +399,6 @@ public class HoodieTestDataGenerator {
    */
   public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, Integer
n) {
     final Set<KeyPartition> used = new HashSet<>();
-
     if (n > numExistingKeys) {
       throw new IllegalArgumentException("Requested unique updates is greater than number
of available keys");
     }
@@ -429,24 +428,24 @@ public class HoodieTestDataGenerator {
    */
   public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) {
     final Set<KeyPartition> used = new HashSet<>();
-
     if (n > numExistingKeys) {
       throw new IllegalArgumentException("Requested unique deletes is greater than number
of available keys");
     }
 
-    return IntStream.range(0, n).boxed().map(i -> {
-      int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1);
-      KeyPartition kp = existingKeys.get(index);
-      // Find the available keyPartition starting from randomly chosen one.
-      while (used.contains(kp)) {
+    List<HoodieKey> result = new ArrayList<>();
+    for (int i = 0; i < n; i++) {
+      int index = RAND.nextInt(numExistingKeys);
+      while (!existingKeys.containsKey(index)) {
         index = (index + 1) % numExistingKeys;
-        kp = existingKeys.get(index);
       }
-      existingKeys.remove(kp);
+      KeyPartition kp = existingKeys.remove(index);
+      existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
+      existingKeys.remove(numExistingKeys - 1);
       numExistingKeys--;
       used.add(kp);
-      return kp.key;
-    });
+      result.add(kp.key);
+    }
+    return result.stream();
   }
 
   /**
@@ -458,28 +457,29 @@ public class HoodieTestDataGenerator {
    */
   public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String commitTime, Integer
n) {
     final Set<KeyPartition> used = new HashSet<>();
-
     if (n > numExistingKeys) {
       throw new IllegalArgumentException("Requested unique deletes is greater than number
of available keys");
     }
 
-    return IntStream.range(0, n).boxed().map(i -> {
-      int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1);
-      KeyPartition kp = existingKeys.get(index);
-      // Find the available keyPartition starting from randomly chosen one.
-      while (used.contains(kp)) {
+    List<HoodieRecord> result = new ArrayList<>();
+    for (int i = 0; i < n; i++) {
+      int index = RAND.nextInt(numExistingKeys);
+      while (!existingKeys.containsKey(index)) {
         index = (index + 1) % numExistingKeys;
-        kp = existingKeys.get(index);
       }
-      existingKeys.remove(kp);
+      // swap chosen index with last index and remove last entry. 
+      KeyPartition kp = existingKeys.remove(index);
+      existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
+      existingKeys.remove(numExistingKeys - 1);
       numExistingKeys--;
       used.add(kp);
       try {
-        return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime));
+        result.add(new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime)));
       } catch (IOException e) {
         throw new HoodieIOException(e.getMessage(), e);
       }
-    });
+    }
+    return result.stream();
   }
 
   public String[] getPartitionPaths() {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 9d324dc..100faa2 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -422,8 +422,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
       } else {
         TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs);
       }
-      TestHelpers.assertRecordCount(totalRecords + 200, tableBasePath + "/*/*.parquet", sqlContext);
-      TestHelpers.assertDistanceCount(totalRecords + 200, tableBasePath + "/*/*.parquet",
sqlContext);
+      TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
+      TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
       return true;
     }, 180);
     ds.shutdownGracefully();


Mime
View raw message