kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject kudu git commit: KUDU-1882 Configuration improvements for Flume Kudu Sink regexp operations producer.
Date Tue, 25 Sep 2018 21:35:31 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 4fba6e8a1 -> 6030c5a32


KUDU-1882 Configuration improvements for Flume Kudu Sink regexp operations
producer.

Adding new properties for the different parsing error policies.
Deprecating the old ones. Default value constants are not removed as they are
public variables on a public class and it would be an API change.
Adding new test class to test the configuration and behaviour.

Change-Id: I0b352b27583ae9eee6cdbe28a768362bea36e00f
Reviewed-on: http://gerrit.cloudera.org:8080/11391
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6030c5a3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6030c5a3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6030c5a3

Branch: refs/heads/master
Commit: 6030c5a32dd0e0c825e8c4f9152a66253de5474c
Parents: 4fba6e8
Author: Ferenc Szabó <szaboferee@apache.org>
Authored: Wed Sep 5 19:21:17 2018 +0200
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue Sep 25 21:35:06 2018 +0000

----------------------------------------------------------------------
 .../sink/RegexpKuduOperationsProducer.java      | 141 ++++++++--
 ...expKuduOperationsProducerParseErrorTest.java | 273 +++++++++++++++++++
 2 files changed, 393 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6030c5a3/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
index ff9fd3c..a3b2c57 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
@@ -21,6 +21,7 @@ package org.apache.kudu.flume.sink;
 
 import java.math.BigDecimal;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -100,7 +101,9 @@ import org.apache.kudu.client.Upsert;
  *   <td>producer.skipMissingColumn</td>
  *   <td>false</td>
  *   <td>No</td>
- *   <td>What to do if a column in the Kudu table has no corresponding capture group.
+ *   <td>
+ *   <b>@deprecated</b><br/> use {@code producer.missingColumnPolicy}
+ *   What to do if a column in the Kudu table has no corresponding capture group.
  *   If set to {@code true}, a warning message is logged and the operation is still attempted.
  *   If set to {@code false}, an exception is thrown and the sink will not process the
  *   {@code Event}, causing a Flume {@code Channel} rollback.
@@ -109,7 +112,9 @@ import org.apache.kudu.client.Upsert;
  *   <td>producer.skipBadColumnValue</td>
  *   <td>false</td>
  *   <td>No</td>
- *   <td>What to do if a value in the pattern match cannot be coerced to the required
type.
+ *   <td>
+ *   <b>@deprecated</b><br/> use {@code producer.badColumnValuePolicy}
+ *   What to do if a value in the pattern match cannot be coerced to the required type.
  *   If set to {@code true}, a warning message is logged and the operation is still attempted.
  *   If set to {@code false}, an exception is thrown and the sink will not process the
  *   {@code Event}, causing a Flume {@code Channel} rollback.
@@ -118,9 +123,44 @@ import org.apache.kudu.client.Upsert;
  *   <td>producer.warnUnmatchedRows</td>
  *   <td>true</td>
  *   <td>No</td>
- *   <td>Whether to log a warning about payloads that do not match the pattern. If
set to
+ *   <td>
+ *   <b>@deprecated</b><br/> use {@code producer.unmatchedRowPolicy}
+ *   Whether to log a warning about payloads that do not match the pattern. If set to
  *   {@code false}, event bodies with no matches will be silently dropped.</td>
  * </tr>
+ * <tr>
+ *   <td>producer.missingColumnPolicy</td>
+ *   <td>REJECT</td>
+ *   <td>No</td>
+ *   <td>What to do if a column in the Kudu table has no corresponding capture group.<br/>
+ *   If set to {@code REJECT}, an exception is thrown and the sink will not process the
+ *   {@code Event}, causing a Flume {@code Channel} rollback.<br/>
+ *   If set to {@code WARN}, a warning message is logged and the operation is still produced.<br/>
+ *   If set to {@code IGNORE}, the operation is still produced without any log message.
+ * </tr>
+ * <tr>
+ *   <td>producer.badColumnValuePolicy</td>
+ *   <td>REJECT</td>
+ *   <td>No</td>
+ *   <td>What to do if a value in the pattern match cannot be coerced to the required
type.<br/>
+ *   If set to {@code REJECT}, an exception is thrown and the sink will not process the
+ *   {@code Event}, causing a Flume {@code Channel} rollback.<br/>
+ *   If set to {@code WARN}, a warning message is logged and the operation is still produced,
+ *   but does not include the given column.<br/>
+ *   If set to {@code IGNORE}, the operation is still produced, but does not include the
given
+ *   column and does not log any message.
+ * </tr>
+ * <tr>
+ *   <td>producer.unmatchedRowPolicy</td>
+ *   <td>WARN</td>
+ *   <td>No</td>
+ *   <td>What to do if a payload does not match the pattern.<br/>
+ *   If set to {@code REJECT}, an exception is thrown and the sink will not process the
+ *   {@code Event}, causing a Flume {@code Channel} rollback.<br/>
+ *   If set to {@code WARN}, a warning message is logged and the row is skipped,
+ *   not producing an operation.<br/>
+ *   If set to {@code IGNORE}, the row is skipped without any log message.
+ * </tr>
  * </table>
  *
  * @see Pattern
@@ -138,20 +178,32 @@ public class RegexpKuduOperationsProducer implements KuduOperationsProducer
{
   public static final String DEFAULT_ENCODING = "utf-8";
   public static final String OPERATION_PROP = "operation";
   public static final String DEFAULT_OPERATION = UPSERT;
+  @Deprecated
   public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn";
+  @Deprecated
   public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false;
+  @Deprecated
   public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue";
+  @Deprecated
   public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false;
+  @Deprecated
   public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows";
+  @Deprecated
   public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true;
+  public static final String MISSING_COLUMN_POLICY_PROP = "missingColumnPolicy";
+  public static final ParseErrorPolicy DEFAULT_MISSING_COLUMN_POLICY = ParseErrorPolicy.REJECT;
+  public static final String BAD_COLUMN_VALUE_POLICY_PROP = "badColumnValuePolicy";
+  public static final ParseErrorPolicy DEFAULT_BAD_COLUMN_VALUE_POLICY = ParseErrorPolicy.REJECT;
+  public static final String UNMATCHED_ROW_POLICY_PROP = "unmatchedRowPolicy";
+  public static final ParseErrorPolicy DEFAULT_UNMATCHED_ROW_POLICY = ParseErrorPolicy.WARN;
 
   private KuduTable table;
   private Pattern pattern;
   private Charset charset;
   private String operation;
-  private boolean skipMissingColumn;
-  private boolean skipBadColumnValue;
-  private boolean warnUnmatchedRows;
+  private ParseErrorPolicy missingColumnPolicy;
+  private ParseErrorPolicy badColumnValuePolicy;
+  private ParseErrorPolicy unmatchedRowPolicy;
 
   public RegexpKuduOperationsProducer() {
   }
@@ -180,12 +232,22 @@ public class RegexpKuduOperationsProducer implements KuduOperationsProducer
{
         validOperations.contains(operation),
         "Unrecognized operation '%s'",
         operation);
-    skipMissingColumn = context.getBoolean(SKIP_MISSING_COLUMN_PROP,
-        DEFAULT_SKIP_MISSING_COLUMN);
-    skipBadColumnValue = context.getBoolean(SKIP_BAD_COLUMN_VALUE_PROP,
-        DEFAULT_SKIP_BAD_COLUMN_VALUE);
-    warnUnmatchedRows = context.getBoolean(WARN_UNMATCHED_ROWS_PROP,
-        DEFAULT_WARN_UNMATCHED_ROWS);
+
+
+    missingColumnPolicy = getParseErrorPolicyCheckingDeprecatedProperty(
+      context, SKIP_MISSING_COLUMN_PROP, MISSING_COLUMN_POLICY_PROP,
+      ParseErrorPolicy.WARN, ParseErrorPolicy.REJECT, DEFAULT_MISSING_COLUMN_POLICY
+    );
+
+    badColumnValuePolicy = getParseErrorPolicyCheckingDeprecatedProperty(
+      context, SKIP_BAD_COLUMN_VALUE_PROP, BAD_COLUMN_VALUE_POLICY_PROP,
+      ParseErrorPolicy.WARN, ParseErrorPolicy.REJECT, DEFAULT_BAD_COLUMN_VALUE_POLICY
+    );
+
+    unmatchedRowPolicy = getParseErrorPolicyCheckingDeprecatedProperty(
+      context, WARN_UNMATCHED_ROWS_PROP, UNMATCHED_ROW_POLICY_PROP,
+      ParseErrorPolicy.WARN, ParseErrorPolicy.IGNORE, DEFAULT_UNMATCHED_ROW_POLICY
+    );
   }
 
   @Override
@@ -223,20 +285,21 @@ public class RegexpKuduOperationsProducer implements KuduOperationsProducer
{
           String msg = String.format(
               "Raw value '%s' couldn't be parsed to type %s for column '%s'",
               raw, col.getType(), col.getName());
-          logOrThrow(skipBadColumnValue, msg, e);
+          logOrThrow(badColumnValuePolicy, msg, e);
         } catch (IllegalArgumentException e) {
           String msg = String.format(
               "Column '%s' has no matching group in '%s'",
               col.getName(), raw);
-          logOrThrow(skipMissingColumn, msg, e);
+          logOrThrow(missingColumnPolicy, msg, e);
         } catch (Exception e) {
           throw new FlumeException("Failed to create Kudu operation", e);
         }
       }
       ops.add(op);
     }
-    if (!match && warnUnmatchedRows) {
-      logger.warn("Failed to match the pattern '{}' in '{}'", pattern, raw);
+    if (!match) {
+      String msg = String.format("Failed to match the pattern '%s' in '%s'", pattern, raw);
+      logOrThrow(unmatchedRowPolicy, msg, null);
     }
     return ops;
   }
@@ -291,16 +354,52 @@ public class RegexpKuduOperationsProducer implements KuduOperationsProducer
{
     }
   }
 
-  private void logOrThrow(boolean log, String msg, Exception e)
+  private void logOrThrow(ParseErrorPolicy policy, String msg, Exception e)
       throws FlumeException {
-    if (log) {
-      logger.warn(msg, e);
-    } else {
-      throw new FlumeException(msg, e);
+    switch (policy) {
+      case REJECT:
+        throw new FlumeException(msg, e);
+      case WARN:
+        logger.warn(msg, e);
+        break;
+      case IGNORE:
+        // Fall through
+      default:
     }
   }
 
   @Override
   public void close() {
   }
+
+  private ParseErrorPolicy getParseErrorPolicyCheckingDeprecatedProperty(
+      Context context, String deprecatedPropertyName, String newPropertyName,
+      ParseErrorPolicy trueValue, ParseErrorPolicy falseValue, ParseErrorPolicy defaultValue)
{
+    ParseErrorPolicy policy;
+    if (context.containsKey(deprecatedPropertyName)) {
+      logger.info("Configuration property {} is deprecated. Use {} instead.",
+          deprecatedPropertyName, newPropertyName);
+      Preconditions.checkArgument(!context.containsKey(newPropertyName),
+          "Both {} and {} specified. Use only one of them, preferably {}.",
+          deprecatedPropertyName, newPropertyName, newPropertyName);
+      policy = context.getBoolean(deprecatedPropertyName) ? trueValue : falseValue;
+    } else {
+      String policyString = context.getString(newPropertyName, defaultValue.name());
+      try {
+        policy = ParseErrorPolicy.valueOf(policyString.toUpperCase());
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgumentException(
+          "Unknown policy '" + policyString + "'. Use one of the following: " +
+              Arrays.toString(ParseErrorPolicy.values()), e);
+      }
+    }
+
+    return policy;
+  }
+
+  private enum ParseErrorPolicy {
+    WARN,
+    IGNORE,
+    REJECT
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/6030c5a3/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
new file mode 100644
index 0000000..9bc5942
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
@@ -0,0 +1,273 @@
+package org.apache.kudu.flume.sink;
+
+
+import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.BAD_COLUMN_VALUE_POLICY_PROP;
+import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.MISSING_COLUMN_POLICY_PROP;
+import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.OPERATION_PROP;
+import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.PATTERN_PROP;
+import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.SKIP_BAD_COLUMN_VALUE_PROP;
+import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.SKIP_MISSING_COLUMN_PROP;
+import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.UNMATCHED_ROW_POLICY_PROP;
+import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.WARN_UNMATCHED_ROWS_PROP;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.util.CapturingLogAppender;
+
+public class RegexpKuduOperationsProducerParseErrorTest extends BaseKuduTest {
+  private static final String TEST_REGEXP = "(?<key>\\d+),(?<byteFld>\\d+),(?<stringFld>\\w+)";
+  private static final String TEST_REGEXP_MISSING_COLUMN = "(?<key>\\d+),(?<byteFld>\\d+)";
+  private static final String TEST_OPERATION = "insert";
+
+  private static final String ROW_UNMATCHING = "invalid row";
+  private static final String ROW_BAD_COLUMN_VALUE = "1,1000,string";
+  private static final String ROW_MISSING_COLUMN = "1,1";
+
+  private static final String ERROR_MSG_UNMATCHED_ROW =
+      "Failed to match the pattern '" + TEST_REGEXP + "' in '" + ROW_UNMATCHING + "'";
+  private static final String ERROR_MSG_MISSING_COLUMN =
+      "Column 'stringFld' has no matching group in '" + ROW_MISSING_COLUMN + "'";
+  private static final String ERROR_MSG_BAD_COLUMN_VALUE =
+      "Raw value '" + ROW_BAD_COLUMN_VALUE +
+        "' couldn't be parsed to type Type: int8 for column 'byteFld'";
+
+  private static final String POLICY_REJECT = "REJECT";
+  private static final String POLICY_WARN = "WARN";
+  private static final String POLICY_IGNORE = "IGNORE";
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testMissingColumnThrowsExceptionDefaultConfig() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+  @Test
+  public void testMissingColumnThrowsExceptionDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(false));
+    testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+  @Test
+  public void testMissingColumnThrowsException() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_REJECT);
+    testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+  @Test
+  public void testMissingColumnLogsWarningDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(true));
+    testLogging(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+  @Test
+  public void testMissingColumnLogsWarning() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_WARN);
+    testLogging(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+
+  @Test
+  public void testMissingColumnIgnored() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_IGNORE);
+    testIgnored(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMissingColumnConfigValidation() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(false));
+    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_IGNORE);
+    getProducer(additionalContext);
+  }
+
+  @Test
+  public void testBadColumnValueThrowsExceptionDefaultConfig() throws Exception {
+    Context additionalContext = new Context();
+    testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test
+  public void testBadColumnValueThrowsExceptionDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(false));
+    testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test
+  public void testBadColumnValueThrowsException() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_REJECT);
+    testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test
+  public void testBadColumnValueLogsWarningDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(true));
+    testLogging(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test
+  public void testBadColumnValueLogsWarning() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_WARN);
+    testLogging(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test
+  public void testBadColumnValueIgnored() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_IGNORE);
+    testIgnored(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadColumnValueConfigValidation() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(false));
+    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_IGNORE);
+    getProducer(additionalContext);
+  }
+
+  @Test
+  public void testUnmatchedRowLogsWarningWithDefaultConfig() throws Exception {
+    Context additionalContext = new Context();
+    testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test
+  public void testUnmatchedRowThrowsException() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_REJECT);
+    testThrowsException(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test
+  public void testUnmatchedRowLogsWarningDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(true));
+    testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test
+  public void testUnmatchedRowLogsWarning() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_WARN);
+    testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test
+  public void testUnmatchedRowIgnoredDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(false));
+    testIgnored(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test
+  public void testUnmatchedRowIgnored() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_IGNORE);
+    testIgnored(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testUnmatchedRowConfigValidation() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(false));
+    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_IGNORE);
+    getProducer(additionalContext);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testUnKnownPolicyConfigValidation() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, "FORCED");
+    getProducer(additionalContext);
+  }
+
+  private void testLogging(
+      Context additionalContext, String expectedError, String eventBody) throws Exception
{
+    String appendedText = processEvent(additionalContext, eventBody);
+    assertTrue(appendedText.contains(expectedError));
+  }
+
+  private void testIgnored(
+      Context additionalContext, String expectedError, String eventBody) throws Exception
{
+    String appendedText = processEvent(additionalContext, eventBody);
+    assertFalse(appendedText.contains(expectedError));
+  }
+
+  private void testThrowsException(
+      Context additionalContext, String expectedError, String eventBody) throws Exception
{
+    thrown.expect(FlumeException.class);
+    thrown.expectMessage(expectedError);
+    processEvent(additionalContext, eventBody);
+  }
+
+  private String processEvent(Context additionalContext, String eventBody) throws Exception
{
+    CapturingLogAppender appender = new CapturingLogAppender();
+    RegexpKuduOperationsProducer producer = getProducer(additionalContext);
+    appender.attach();
+    producer.getOperations(EventBuilder.withBody(eventBody.getBytes(Charset.forName("UTF-8"))));
+
+    return appender.getAppendedText();
+  }
+
+
+  private RegexpKuduOperationsProducer getProducer(Context additionalContext) throws Exception
{
+    RegexpKuduOperationsProducer producer = new RegexpKuduOperationsProducer();
+    producer.initialize(createNewTable("test"));
+    Context context = new Context();
+    context.put(PATTERN_PROP, TEST_REGEXP);
+    context.put(OPERATION_PROP, TEST_OPERATION);
+    context.putAll(additionalContext.getParameters());
+    producer.configure(context);
+
+    return producer;
+  }
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(10);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("byteFld", Type.INT8).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
+    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+    return table;
+  }
+
+
+}


Mime
View raw message