kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject kudu git commit: KUDU-2078: Sink failure if batch size > session's flush buffer size
Date Thu, 31 Aug 2017 18:02:24 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 986e8de63 -> 294276070


KUDU-2078: Sink failure if batch size > session's flush buffer size

The Flume sink uses manual flush mode, so if users set the
sink's batch size parameter above the manual flush default
buffer size, the sink could fail batches (over and over). This
patch sets the session's buffer size (which is in terms of
number of ops) to the same as the batch size, so this problem
can no longer occur.

I considered using AUTO_FLUSH_BACKGROUND for the flushing as
well, but it can result in out-of-order writes, which might be
unexpected semantics for Flume (as opposed to, say, Spark).
Using AUTO_FLUSH_BACKGROUND with a high batch size would likely
be more performant, but we can add that as an additional
configuration later if the need arises.

Change-Id: Id1c54bcecc3e13ae64dd90efe6cf53021517dcdf
Reviewed-on: http://gerrit.cloudera.org:8080/7641
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/29427607
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/29427607
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/29427607

Branch: refs/heads/master
Commit: 2942760708d9bd252dc266434127d7eaa5107f55
Parents: 986e8de
Author: Will Berkeley <wdberkeley@apache.org>
Authored: Thu Aug 10 09:52:22 2017 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Thu Aug 31 18:02:06 2017 +0000

----------------------------------------------------------------------
 .../main/java/org/apache/kudu/flume/sink/KuduSink.java   | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/29427607/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
index 42f0542..d4e58b1 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
@@ -67,7 +67,7 @@ import org.apache.kudu.client.SessionConfiguration;
  *     The port is optional.</td></tr>
  * <tr><td>tableName</td><td></td><td>Yes</td>
  *     <td>The name of the Kudu table to write to.</td></tr>
- * <tr><td>batchSize</td><td>100</td><td>No</td>
+ * <tr><td>batchSize</td><td>1000</td><td>No</td>
  * <td>The maximum number of events the sink takes from the channel per transaction.</td></tr>
  * <tr><td>ignoreDuplicateRows</td><td>true</td>
  *     <td>No</td><td>Whether to ignore duplicate primary key errors caused
by inserts.</td></tr>
@@ -93,7 +93,7 @@ import org.apache.kudu.client.SessionConfiguration;
 @InterfaceStability.Evolving
 public class KuduSink extends AbstractSink implements Configurable {
   private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
-  private static final Long DEFAULT_BATCH_SIZE = 100L;
+  private static final int DEFAULT_BATCH_SIZE = 1000;
   private static final Long DEFAULT_TIMEOUT_MILLIS =
           AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
   private static final String DEFAULT_KUDU_OPERATION_PRODUCER =
@@ -102,7 +102,7 @@ public class KuduSink extends AbstractSink implements Configurable {
 
   private String masterAddresses;
   private String tableName;
-  private long batchSize;
+  private int batchSize;
   private long timeoutMillis;
   private boolean ignoreDuplicateRows;
   private KuduTable table;
@@ -126,7 +126,7 @@ public class KuduSink extends AbstractSink implements Configurable {
     Preconditions.checkState(table == null && session == null,
         "Please call stop before calling start on an old instance.");
 
-    // client is not null only inside tests
+    // Client is not null only inside tests.
     if (client == null) {
       client = new KuduClient.KuduClientBuilder(masterAddresses).build();
     }
@@ -134,6 +134,7 @@ public class KuduSink extends AbstractSink implements Configurable {
     session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
     session.setTimeoutMillis(timeoutMillis);
     session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
+    session.setMutationBufferSpace(batchSize);
 
     try {
       table = client.openTable(tableName);
@@ -190,7 +191,7 @@ public class KuduSink extends AbstractSink implements Configurable {
         "Missing table name. Please specify property '%s'",
         TABLE_NAME);
 
-    batchSize = context.getLong(BATCH_SIZE, DEFAULT_BATCH_SIZE);
+    batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
     timeoutMillis = context.getLong(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
     ignoreDuplicateRows = context.getBoolean(IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
     String operationProducerType = context.getString(PRODUCER);


Mime
View raw message