kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [kudu] 02/02: KUDU-2810: [backup] Add workaround DELETE IGNORE support
Date Mon, 06 May 2019 22:10:13 GMT
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 11a6a06d646fc852f26e9ac6cfda8f3c5c09b9b0
Author: Grant Henke <granthenke@apache.org>
AuthorDate: Mon May 6 12:28:47 2019 -0500

    KUDU-2810: [backup] Add workaround DELETE IGNORE support
    
    This patch adds a client side version of DELETE IGNORE
    for use in the restore job. It does this the same way
    existing client side INSERT IGNORE support was added,
    by checking and filtering the error code client side.
    
    More optimal server side ignore functionality is tracked
    by KUDU-1563.
    
    Change-Id: Ib718b72a73b06d9b9dc809fde3c52a1d498ceb23
    Reviewed-on: http://gerrit.cloudera.org:8080/13246
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Grant Henke <granthenke@apache.org>
    Tested-by: Grant Henke <granthenke@apache.org>
---
 .../scala/org/apache/kudu/backup/KuduRestore.scala |  4 ++
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 37 ++++++++++++++++++
 .../org/apache/kudu/client/AsyncKuduSession.java   | 15 +++++++-
 .../main/java/org/apache/kudu/client/Batch.java    | 45 ++++++++++++++--------
 .../java/org/apache/kudu/client/KuduSession.java   | 10 +++++
 .../java/org/apache/kudu/client/Operation.java     | 14 +++++--
 .../apache/kudu/client/SessionConfiguration.java   | 30 +++++++++++++--
 .../org/apache/kudu/client/TestKuduSession.java    | 40 +++++++++++++++++++
 8 files changed, 172 insertions(+), 23 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 3b6a537..7f1e515 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -104,6 +104,10 @@ object KuduRestore {
           val partitioner = createPartitionFilter(metadata, lastMetadata)
           val session = context.syncClient.newSession
           session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
+          // In the case of task retries we need to ignore NotFound errors for deleted rows.
+          // TODO(KUDU-1563): Implement server side ignore capabilities to improve performance
+          //  and reliability.
+          session.setIgnoreAllNotFoundRows(true)
           try for (internalRow <- internalRows) {
             // Convert the InternalRows to Rows.
             // This avoids any corruption as reported in SPARK-26880.
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 6de14d3..0c837c0 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -34,6 +34,8 @@ import org.apache.kudu.test.RandomUtils
 import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder
 import org.apache.kudu.util.HybridTimeUtil
 import org.apache.kudu.util.SchemaGenerator.SchemaGeneratorBuilder
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.scheduler.SparkListenerJobEnd
 import org.junit.Assert._
 import org.junit.After
 import org.junit.Before
@@ -436,6 +438,41 @@ class TestKuduBackup extends KuduTestSuite {
     restoreAndValidateTable(tableName, 50)
   }
 
+  @Test
+  def testDeleteIgnore(): Unit = {
+    insertRows(table, 100) // Insert data into the default test table.
+
+    // Run and validate initial backup.
+    backupAndValidateTable(tableName, 100, false)
+
+    // Delete the rows and validate incremental backup.
+    Range(0, 100).foreach(deleteRow)
+    backupAndValidateTable(tableName, 100, true)
+
+    // When restoring the table, delete half the rows after each job completes.
+    // This will force delete rows to cause NotFound errors and allow validation
+    // that they are correctly handled.
+    val listener = new SparkListener {
+      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+        val client = kuduContext.syncClient
+        val table = client.openTable(s"$tableName-restore")
+        val scanner = kuduContext.syncClient.newScannerBuilder(table).build()
+        val session = client.newSession()
+        scanner.asScala.foreach { rr =>
+          if (rr.getInt("key") % 2 == 0) {
+            val delete = table.newDelete()
+            val row = delete.getRow
+            row.addInt("key", rr.getInt("key"))
+            session.apply(delete)
+          }
+        }
+      }
+    }
+    ss.sparkContext.addSparkListener(listener)
+
+    restoreAndValidateTable(tableName, 0)
+  }
+
   def createPartitionRow(value: Int): PartialRow = {
     val row = schema.newPartialRow()
     row.addInt("key", value)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 0e174f7..2e2aab8 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -179,6 +179,7 @@ public class AsyncKuduSession implements SessionConfiguration {
   private volatile boolean closed = false;
 
   private boolean ignoreAllDuplicateRows = false;
+  private boolean ignoreAllNotFoundRows = false;
 
   /**
    * Package-private constructor meant to be used via AsyncKuduClient
@@ -281,6 +282,16 @@ public class AsyncKuduSession implements SessionConfiguration {
   }
 
   @Override
+  public boolean isIgnoreAllNotFoundRows() {
+    return ignoreAllNotFoundRows;
+  }
+
+  @Override
+  public void setIgnoreAllNotFoundRows(boolean ignoreAllNotFoundRows) {
+    this.ignoreAllNotFoundRows = ignoreAllNotFoundRows;
+  }
+
+  @Override
   public int countPendingErrors() {
     return errorCollector.countErrors();
   }
@@ -377,7 +388,8 @@ public class AsyncKuduSession implements SessionConfiguration {
 
         Batch batch = batches.get(tabletId);
         if (batch == null) {
-          batch = new Batch(operation.getTable(), tablet, ignoreAllDuplicateRows);
+          batch = new Batch(operation.getTable(), tablet, ignoreAllDuplicateRows,
+              ignoreAllNotFoundRows);
           batches.put(tabletId, batch);
         }
         batch.add(operation, currentIndex++);
@@ -548,6 +560,7 @@ public class AsyncKuduSession implements SessionConfiguration {
       }
       operation.setExternalConsistencyMode(this.consistencyMode);
       operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
+      operation.setIgnoreAllNotFoundRows(ignoreAllNotFoundRows);
 
       // Add a callback to update the propagated timestamp returned from the server.
       Callback<Deferred<OperationResponse>, OperationResponse> cb =
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index 9685923..a6f37f4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -18,17 +18,20 @@
 package org.apache.kudu.client;
 
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
+import org.apache.kudu.WireProtocol.AppStatusPB.ErrorCode;
 import org.apache.kudu.security.Token;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.jboss.netty.util.Timer;
 
-import org.apache.kudu.WireProtocol;
 import org.apache.kudu.client.Statistics.Statistic;
 import org.apache.kudu.client.Statistics.TabletStatistics;
 import org.apache.kudu.tserver.Tserver;
@@ -59,12 +62,25 @@ class Batch extends KuduRpc<BatchResponse> {
    */
   private long rowOperationsSizeBytes = 0;
 
-  /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
-  private final boolean ignoreAllDuplicateRows;
+  private final EnumSet<ErrorCode> ignoredErrors;
 
-  Batch(KuduTable table, LocatedTablet tablet, boolean ignoreAllDuplicateRows) {
+  Batch(KuduTable table, LocatedTablet tablet, boolean ignoreAllDuplicateRows,
+        boolean ignoreAllNotFoundRows) {
     super(table, null, 0);
-    this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
+    // Build a set of ignored errors.
+    Set<ErrorCode> ignoredErrors = new HashSet<>();
+    if (ignoreAllDuplicateRows) {
+      ignoredErrors.add(ErrorCode.ALREADY_PRESENT);
+    }
+    if (ignoreAllNotFoundRows) {
+      ignoredErrors.add(ErrorCode.NOT_FOUND);
+    }
+    // EnumSet.copyOf doesn't handle an empty set, so handle that case specially.
+    if (ignoredErrors.isEmpty()) {
+      this.ignoredErrors = EnumSet.noneOf(ErrorCode.class);
+    } else {
+      this.ignoredErrors = EnumSet.copyOf(ignoredErrors);
+    }
     this.tablet = tablet;
   }
 
@@ -149,19 +165,16 @@ class Batch extends KuduRpc<BatchResponse> {
     readProtobuf(callResponse.getPBMessage(), builder);
 
     List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB = builder.getPerRowErrorsList();
-    if (ignoreAllDuplicateRows) {
-      boolean allAlreadyPresent = true;
+    // Create a new list of errors that doesn't contain ignored error codes.
+    if (!ignoredErrors.isEmpty()) {
+      List<Tserver.WriteResponsePB.PerRowErrorPB> filteredErrors = new ArrayList<>();
       for (Tserver.WriteResponsePB.PerRowErrorPB errorPB : errorsPB) {
-        if (errorPB.getError().getCode() != WireProtocol.AppStatusPB.ErrorCode.ALREADY_PRESENT)
{
-          allAlreadyPresent = false;
-          break;
+        if (!ignoredErrors.contains(errorPB.getError().getCode())) {
+          filteredErrors.add(errorPB);
         }
       }
-      if (allAlreadyPresent) {
-        errorsPB = Collections.emptyList();
-      }
+      errorsPB = filteredErrors;
     }
-
     BatchResponse response = new BatchResponse(timeoutTracker.getElapsedMillis(),
                                                tsUUID,
                                                builder.getTimestamp(),
@@ -220,7 +233,7 @@ class Batch extends KuduRpc<BatchResponse> {
     return MoreObjects.toStringHelper(this)
                       .add("operations", operations.size())
                       .add("tablet", tablet)
-                      .add("ignoreAllDuplicateRows", ignoreAllDuplicateRows)
+                      .add("ignoredErrors", Iterables.toString(ignoredErrors))
                       .add("rpc", super.toString())
                       .toString();
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
index 0daaecb..c181eca 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
@@ -176,6 +176,16 @@ public class KuduSession implements SessionConfiguration {
   }
 
   @Override
+  public boolean isIgnoreAllNotFoundRows() {
+    return session.isIgnoreAllNotFoundRows();
+  }
+
+  @Override
+  public void setIgnoreAllNotFoundRows(boolean ignoreAllNotFoundRows) {
+    session.setIgnoreAllNotFoundRows(ignoreAllNotFoundRows);
+  }
+
+  @Override
   public int countPendingErrors() {
     return session.countPendingErrors();
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index f7b69fb..bb655f0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
+import org.apache.kudu.WireProtocol.AppStatusPB.ErrorCode;
 import org.apache.kudu.security.Token;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -37,7 +38,6 @@ import org.jboss.netty.util.Timer;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.WireProtocol;
 import org.apache.kudu.WireProtocol.RowOperationsPB;
 import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
 import org.apache.kudu.client.Statistics.Statistic;
@@ -94,6 +94,8 @@ public abstract class Operation extends KuduRpc<OperationResponse>
{
 
   /** See {@link SessionConfiguration#setIgnoreAllDuplicateRows(boolean)} */
   boolean ignoreAllDuplicateRows = false;
+  /** See {@link SessionConfiguration#setIgnoreAllNotFoundRows(boolean)} */
+  boolean ignoreAllNotFoundRows = false;
 
   /**
    * Package-private constructor. Subclasses need to be instantiated via AsyncKuduSession
@@ -126,6 +128,11 @@ public abstract class Operation extends KuduRpc<OperationResponse>
{
     this.ignoreAllDuplicateRows = ignoreAllDuplicateRows;
   }
 
+  /** See {@link SessionConfiguration#setIgnoreAllNotFoundRows(boolean)} */
+  void setIgnoreAllNotFoundRows(boolean ignoreAllNotFoundRows) {
+    this.ignoreAllNotFoundRows = ignoreAllNotFoundRows;
+  }
+
   /**
    * Classes extending Operation need to have a specific ChangeType
    * @return Operation's ChangeType
@@ -189,8 +196,9 @@ public abstract class Operation extends KuduRpc<OperationResponse>
{
     Tserver.WriteResponsePB.PerRowErrorPB error = null;
     if (builder.getPerRowErrorsCount() != 0) {
       error = builder.getPerRowErrors(0);
-      if (ignoreAllDuplicateRows &&
-          error.getError().getCode() == WireProtocol.AppStatusPB.ErrorCode.ALREADY_PRESENT)
{
+      ErrorCode errorCode = error.getError().getCode();
+      if ((ignoreAllDuplicateRows && errorCode == ErrorCode.ALREADY_PRESENT) ||
+          (ignoreAllNotFoundRows && errorCode == ErrorCode.NOT_FOUND)) {
         error = null;
       }
     }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
index 1bdfb98..12aa31b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
@@ -151,9 +151,12 @@ public interface SessionConfiguration {
 
   /**
    * Configures the option to ignore all the row errors if they are all of the AlreadyPresent
type.
-   * This can be needed when facing KUDU-568. The effect of enabling this is that operation
-   * responses that match this pattern will be cleared of their row errors, meaning that
we consider
-   * them successful.
+   * This can be useful when it is possible for INSERT operations to be retried and fail.
+   * The effect of enabling this is that operation responses that match this pattern will
be
+   * cleared of their row errors, meaning that we consider them successful.
+   *
+   * TODO(KUDU-1563): Implement server side ignore capabilities to improve performance and
+   *  reliability of INSERT ignore operations.
    *
    * <p>Disabled by default.
    * @param ignoreAllDuplicateRows true if this session should enforce this, else false
@@ -161,6 +164,27 @@ public interface SessionConfiguration {
   void setIgnoreAllDuplicateRows(boolean ignoreAllDuplicateRows);
 
   /**
+   * Tells if the session is currently ignoring row errors when the whole list returned by
a tablet
+   * server is of the NotFound type.
+   * @return true if the session is enforcing this, else false
+   */
+  boolean isIgnoreAllNotFoundRows();
+
+  /**
+   * Configures the option to ignore all the row errors if they are all of the NotFound type.
+   * This can be useful when it is possible for DELETE operations to be retried and fail.
+   * The effect of enabling this is that operation responses that match this pattern will
be
+   * cleared of their row errors, meaning that we consider them successful.
+   *
+   * TODO(KUDU-1563): Implement server side ignore capabilities to improve performance and
+   *  reliability of DELETE ignore operations.
+   *
+   * <p>Disabled by default.
+   * @param ignoreAllNotFoundRows true if this session should enforce this, else false
+   */
+  void setIgnoreAllNotFoundRows(boolean ignoreAllNotFoundRows);
+
+  /**
    * Return the number of errors which are pending. Errors may accumulate when
    * using {@link FlushMode#AUTO_FLUSH_BACKGROUND AUTO_FLUSH_BACKGROUND} mode.
    * @return a count of errors
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
index 1df4490..628a05f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
@@ -87,6 +87,8 @@ public class TestKuduSession {
     for (int i = 0; i < 10; i++) {
       session.apply(createInsert(table, i));
     }
+    // Test all of the various flush modes to be sure we correctly handle errors in
+    // individual operations and batches.
     for (SessionConfiguration.FlushMode mode : SessionConfiguration.FlushMode.values()) {
       session.setFlushMode(mode);
       for (int i = 0; i < 10; i++) {
@@ -110,6 +112,37 @@ public class TestKuduSession {
   }
 
   @Test(timeout = 100000)
+  public void testIgnoreAllNotFoundRows() throws Exception {
+    KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
+
+    KuduSession session = client.newSession();
+    session.setIgnoreAllNotFoundRows(true);
+    // Test all of the various flush modes to be sure we correctly handle errors in
+    // individual operations and batches.
+    for (SessionConfiguration.FlushMode mode : SessionConfiguration.FlushMode.values()) {
+      session.setFlushMode(mode);
+      for (int i = 0; i < 10; i++) {
+        session.apply(createDelete(table, i));
+        OperationResponse resp = session.apply(createInsert(table, i));
+        if (mode == SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC) {
+          assertFalse(resp.hasRowError());
+        }
+      }
+      if (mode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
+        List<OperationResponse> responses = session.flush();
+        for (OperationResponse resp : responses) {
+          assertFalse(resp.hasRowError());
+        }
+      } else if (mode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
+        while (session.hasPendingOperations()) {
+          Thread.sleep(100);
+        }
+        assertEquals(0, session.countPendingErrors());
+      }
+    }
+  }
+
+  @Test(timeout = 100000)
   public void testBatchWithSameRow() throws Exception {
     KuduTable table = client.createTable(tableName, basicSchema, getBasicCreateTableOptions());
 
@@ -415,4 +448,11 @@ public class TestKuduSession {
     row.addBoolean(4, true);
     return upsert;
   }
+
+  private Delete createDelete(KuduTable table, int key) {
+    Delete delete = table.newDelete();
+    PartialRow row = delete.getRow();
+    row.addInt(0, key);
+    return delete;
+  }
 }


Mime
View raw message