kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [2/2] kudu git commit: API and style improvements to the Kudu Flume Sink
Date Thu, 08 Sep 2016 19:21:19 GMT
API and style improvements to the Kudu Flume Sink

This patch cleans up the Flume integration code a bit. Specifically:
1. s/EventProducer/OperationsProducer/ since the "KuduEventProducer"
consumed Flume Events and produced Kudu Operations.
2. The KuduOperationsProducer API was changed. Now the initialize
method takes just a KuduTable, and should be called once to initialize
the KuduOperationsProducer after it is configured. The getOperations
method now takes the Event instead.
3. The close method for a KuduOperationsProducer is now called when
the KuduSink is called. Previously this was implied so in a comment
but was not true.
4. General clean-up and style improvements.

Change-Id: I357df8cac7daa6ce105f9568cc3af09697032eb6
Reviewed-on: http://gerrit.cloudera.org:8080/4320
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fdfcdba2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fdfcdba2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fdfcdba2

Branch: refs/heads/master
Commit: fdfcdba21f1c70b882d763e47a4d7d028bf6e2ea
Parents: d807034
Author: Will Berkeley <wdberkeley@gmail.com>
Authored: Tue Sep 6 13:14:50 2016 -0400
Committer: Mike Percy <mpercy@apache.org>
Committed: Thu Sep 8 19:20:58 2016 +0000

----------------------------------------------------------------------
 docs/release_notes.adoc                         |  11 +
 .../kudu/flume/sink/AvroKuduEventProducer.java  | 284 -------------------
 .../flume/sink/AvroKuduOperationsProducer.java  | 279 ++++++++++++++++++
 .../kudu/flume/sink/KuduEventProducer.java      |  59 ----
 .../kudu/flume/sink/KuduOperationsProducer.java |  56 ++++
 .../org/apache/kudu/flume/sink/KuduSink.java    | 113 ++++----
 .../sink/KuduSinkConfigurationConstants.java    |  20 +-
 .../sink/SimpleKeyedKuduEventProducer.java      |  96 -------
 .../sink/SimpleKeyedKuduOperationsProducer.java | 133 +++++++++
 .../flume/sink/SimpleKuduEventProducer.java     |  84 ------
 .../sink/SimpleKuduOperationsProducer.java      |  90 ++++++
 .../src/test/avro/testAvroEventProducer.avsc    |  11 -
 .../avro/testAvroKuduOperationsProducer.avsc    |  11 +
 .../flume/sink/AvroKuduEventProducerTest.java   | 215 --------------
 .../sink/AvroKuduOperationsProducerTest.java    | 209 ++++++++++++++
 .../flume/sink/KeyedKuduEventProducerTest.java  | 218 --------------
 .../sink/KeyedKuduOperationsProducerTest.java   | 227 +++++++++++++++
 17 files changed, 1085 insertions(+), 1031 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index be89002..fc947b4 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -73,6 +73,17 @@ Kudu 1.0.0 are not supported.
   are not recommended and may be removed or modified with no deprecation period
   and without notice in future Kudu releases.
 
+- The KuduEventProducer interface used to process Flume events into Kudu operations
+  for the Kudu Flume Sink has changed, and has been renamed KuduOperationsProducer.
+  The existing KuduEventProducers have been updated for the new interface, and have
+  been renamed similarly.
+
+[[rn_1.0.0_new_features]]
+=== New features
+
+- The Kudu Flume Sink now supports processing events containing Avro-encoded
+  records, using the new AvroKuduOperationsProducer.
+
 [[rn_0.10.0]]
 == Release notes specific to 0.10.0
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java
deleted file mode 100644
index f9ee683..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.flume.conf.ComponentConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-
-/**
- * <p>An Avro serializer that generates one insert or operation per event by deserializing the event
- * body as an Avro record and mapping its fields to columns in a Kudu table.
- * <p><strong>Avro Kudu Event Producer configuration parameters</strong>
- * <table cellpadding=3 cellspacing=0 border=1>
- * <tr><th>Property Name</th>
- * <th>Default</th>
- * <th>Required?</th>
- * <th>Description</th></tr>
- * <tr><td>producer.operation</td>
- * <td>operation</td>
- * <td>No</td>
- * <td>The operation used to write events to Kudu.</td>
- * </tr>
- * <tr><td>producer.schemaPath</td>
- * <td></td>
- * <td>No</td>
- * <td>The location of the Avro schema file used to deserialize the Avro-encoded event bodies.
- * It's used whenever an event does not include its own schema. If not specified, the
- * schema must be specified on a per-event basis, either by url or as a literal.
- * Schemas must be a record type.
- * </td>
- * </tr>
- * </table>
- */
-public class AvroKuduEventProducer implements KuduEventProducer {
-  public static final String OPERATION_PROP = "operation";
-  public static final String SCHEMA_PROP = "schemaPath";
-  public static final String DEFAULT_OPERATION = "upsert";
-  public static final String SCHEMA_URL_HEADER = "flume.avro.schema.url";
-  public static final String SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal";
-
-  private String operation;
-  private DatumReader<GenericRecord> reader;
-  private GenericRecord reuse;
-  private KuduTable table;
-  private byte[] payload;
-  private String defaultSchemaURL;
-
-  /**
-   * The binary decoder to reuse for event parsing.
-   */
-  private BinaryDecoder decoder = null;
-
-  public AvroKuduEventProducer() {
-  }
-
-  private static final Configuration conf = new Configuration();
-
-  /**
-   * A cache of schemas retrieved by URL to avoid re-parsing the schema.
-   */
-  private static final LoadingCache<String, Schema> schemasFromURL =
-      CacheBuilder.newBuilder()
-          .build(new CacheLoader<String, Schema>() {
-            @Override
-            public Schema load(String url) throws IOException {
-              Schema.Parser parser = new Schema.Parser();
-              InputStream is = null;
-              try {
-                FileSystem fs = FileSystem.get(URI.create(url), conf);
-                if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
-                  is = fs.open(new Path(url));
-                } else {
-                  is = new URL(url).openStream();
-                }
-                return parser.parse(is);
-              } finally {
-                if (is != null) {
-                  is.close();
-                }
-              }
-            }
-          });
-
-  /**
-   * A cache of literal schemas to avoid re-parsing the schema.
-   */
-  private static final LoadingCache<String, Schema> schemasFromLiteral =
-      CacheBuilder.newBuilder()
-          .build(new CacheLoader<String, Schema>() {
-            @Override
-            public Schema load(String literal) {
-              Preconditions.checkNotNull(literal,
-                  "Schema literal cannot be null without a Schema URL");
-              return new Schema.Parser().parse(literal);
-            }
-          });
-
-  /**
-   * A cache of DatumReaders per schema.
-   */
-  private final LoadingCache<Schema, DatumReader<GenericRecord>> readers =
-      CacheBuilder.newBuilder()
-          .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
-            @Override
-            public DatumReader<GenericRecord> load(Schema schema) {
-              return new GenericDatumReader<>(schema);
-            }
-          });
-
-  @Override
-  public void configure(Context context) {
-    this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION);
-
-    String schemaPath = context.getString(SCHEMA_PROP);
-    if (schemaPath != null) {
-      defaultSchemaURL = schemaPath;
-    }
-  }
-
-  @Override
-  public void configure(ComponentConfiguration conf) {
-  }
-
-  // NB: this is called once per event
-  @Override
-  public void initialize(Event event, KuduTable table) {
-    this.payload = event.getBody();
-    this.table = table;
-    this.reader = readers.getUnchecked(getSchema(event));
-  }
-
-  @Override
-  public List<Operation> getOperations() throws FlumeException {
-    decoder = DecoderFactory.get().binaryDecoder(payload, decoder);
-    try {
-      reuse = reader.read(reuse, decoder);
-    } catch (IOException e) {
-      throw new FlumeException("Cannot deserialize event", e);
-    }
-    Operation op;
-    switch (operation.toLowerCase()) {
-      case "upsert":
-        op = table.newUpsert();
-        break;
-      case "insert":
-        op = table.newInsert();
-        break;
-      default:
-        throw new FlumeException(String.format("Unexpected operation %s", operation));
-    }
-    setupOp(op, reuse);
-    return Collections.singletonList(op);
-  }
-
-  private void setupOp(Operation op, GenericRecord record) {
-    PartialRow row = op.getRow();
-    for (ColumnSchema col : table.getSchema().getColumns()) {
-      String name = col.getName();
-      Object value = record.get(name);
-      if (value == null) {
-        if (col.isNullable()) {
-          row.setNull(name);
-        } else {
-          // leave unset for possible Kudu default
-        }
-      } else {
-        // Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed as
-        // a larger type.
-        try {
-          switch (col.getType()) {
-            case BOOL:
-              row.addBoolean(name, (boolean) value);
-              break;
-            case INT8:
-              row.addByte(name, (byte) value);
-              break;
-            case INT16:
-              row.addShort(name, (short) value);
-              break;
-            case INT32:
-              row.addInt(name, (int) value);
-              break;
-            case INT64: // Fall through
-            case TIMESTAMP:
-              row.addLong(name, (long) value);
-              break;
-            case FLOAT:
-              row.addFloat(name, (float) value);
-              break;
-            case DOUBLE:
-              row.addDouble(name, (double) value);
-              break;
-            case STRING:
-              row.addString(name, value.toString());
-              break;
-            case BINARY:
-              row.addBinary(name, (byte[]) value);
-              break;
-            default:
-              throw new FlumeException(String.format(
-                  "Unrecognized type %s for column %s", col.getType().toString(), name));
-          }
-        } catch (ClassCastException e) {
-          throw new FlumeException(String.format("Failed to coerce value for column %s to type %s",
-              col.getName(),
-              col.getType()));
-        }
-      }
-    }
-  }
-
-  private Schema getSchema(Event event) throws FlumeException {
-    Map<String, String> headers = event.getHeaders();
-    String schemaURL = headers.get(SCHEMA_URL_HEADER);
-    String schemaLiteral = headers.get(SCHEMA_LITERAL_HEADER);
-    try {
-      if (schemaURL != null) {
-        return schemasFromURL.get(schemaURL);
-      } else if (schemaLiteral != null) {
-        return schemasFromLiteral.get(schemaLiteral);
-      } else if (defaultSchemaURL != null) {
-        return schemasFromURL.get(defaultSchemaURL);
-      } else {
-        throw new FlumeException(
-            String.format("No schema for event! Specify configuration property %s or event header %s",
-                SCHEMA_PROP,
-                SCHEMA_URL_HEADER));
-      }
-    } catch (ExecutionException ex) {
-      throw new FlumeException("Cannot get schema", ex);
-    } catch (UncheckedExecutionException ex) {
-      throw new FlumeException("Cannot parse schema", ex);
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
new file mode 100644
index 0000000..f799162
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.flume.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+/**
+ * <p>An Avro serializer that generates one operation per event by deserializing the event
+ * body as an Avro record and mapping its fields to columns in a Kudu table.
+ * <p><strong>Avro Kudu Operations Producer configuration parameters</strong>
+ * <table cellpadding=3 cellspacing=0 border=1>
+ * <tr><th>Property Name</th>
+ *   <th>Default</th>
+ *   <th>Required?</th>
+ *   <th>Description</th></tr>
+ * <tr>
+ *   <td>producer.operation</td>
+ *   <td>upsert</td>
+ *   <td>No</td>
+ *   <td>The operation used to write events to Kudu.
+ *   Supported operations are 'insert' and 'upsert'</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.schemaPath</td>
+ *   <td></td>
+ *   <td>No</td>
+ *   <td>The location of the Avro schema file used to deserialize the Avro-encoded event bodies.
+ *   It's used whenever an event does not include its own schema. If not specified, the
+ *   schema must be specified on a per-event basis, either by url or as a literal.
+ *   Schemas must be record type.</td>
+ * </tr>
+ * </table>
+ */
+public class AvroKuduOperationsProducer implements KuduOperationsProducer {
+  public static final String OPERATION_PROP = "operation";
+  public static final String SCHEMA_PROP = "schemaPath";
+  public static final String DEFAULT_OPERATION = "upsert";
+  public static final String SCHEMA_URL_HEADER = "flume.avro.schema.url";
+  public static final String SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal";
+
+  private String operation;
+  private GenericRecord reuse;
+  private KuduTable table;
+  private String defaultSchemaURL;
+
+  /**
+   * The binary decoder to reuse for event parsing.
+   */
+  private BinaryDecoder decoder = null;
+
+  /**
+   * A cache of schemas retrieved by URL to avoid re-parsing the schema.
+   */
+  private static final LoadingCache<String, Schema> schemasFromURL =
+      CacheBuilder.newBuilder()
+          .build(new CacheLoader<String, Schema>() {
+            @Override
+            public Schema load(String url) throws IOException {
+              Schema.Parser parser = new Schema.Parser();
+              InputStream is = null;
+              try {
+                FileSystem fs = FileSystem.get(URI.create(url), conf);
+                if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
+                  is = fs.open(new Path(url));
+                } else {
+                  is = new URL(url).openStream();
+                }
+                return parser.parse(is);
+              } finally {
+                if (is != null) {
+                  is.close();
+                }
+              }
+            }
+          });
+
+  /**
+   * A cache of literal schemas to avoid re-parsing the schema.
+   */
+  private static final LoadingCache<String, Schema> schemasFromLiteral =
+      CacheBuilder.newBuilder()
+          .build(new CacheLoader<String, Schema>() {
+            @Override
+            public Schema load(String literal) {
+              Preconditions.checkNotNull(literal,
+                  "Schema literal cannot be null without a Schema URL");
+              return new Schema.Parser().parse(literal);
+            }
+          });
+
+  /**
+   * A cache of DatumReaders per schema.
+   */
+  private static final LoadingCache<Schema, DatumReader<GenericRecord>> readers =
+      CacheBuilder.newBuilder()
+          .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
+            @Override
+            public DatumReader<GenericRecord> load(Schema schema) {
+              return new GenericDatumReader<>(schema);
+            }
+          });
+
+  private static final Configuration conf = new Configuration();
+
+  public AvroKuduOperationsProducer() {
+  }
+
+  @Override
+  public void configure(Context context) {
+    this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION);
+
+    String schemaPath = context.getString(SCHEMA_PROP);
+    if (schemaPath != null) {
+      defaultSchemaURL = schemaPath;
+    }
+  }
+
+  @Override
+  public void initialize(KuduTable table) {
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations(Event event) throws FlumeException {
+    DatumReader<GenericRecord> reader = readers.getUnchecked(getSchema(event));
+    decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
+    try {
+      reuse = reader.read(reuse, decoder);
+    } catch (IOException e) {
+      throw new FlumeException("Cannot deserialize event", e);
+    }
+    Operation op;
+    switch (operation.toLowerCase()) {
+      case "upsert":
+        op = table.newUpsert();
+        break;
+      case "insert":
+        op = table.newInsert();
+        break;
+      default:
+        throw new FlumeException(String.format("Unexpected operation %s", operation));
+    }
+    setupOp(op, reuse);
+    return Collections.singletonList(op);
+  }
+
+  private void setupOp(Operation op, GenericRecord record) {
+    PartialRow row = op.getRow();
+    for (ColumnSchema col : table.getSchema().getColumns()) {
+      String name = col.getName();
+      Object value = record.get(name);
+      if (value == null) {
+        if (col.isNullable()) {
+          row.setNull(name);
+        } else {
+          // leave unset for possible Kudu default
+        }
+      } else {
+        // Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed as
+        // a larger type.
+        try {
+          switch (col.getType()) {
+            case BOOL:
+              row.addBoolean(name, (boolean) value);
+              break;
+            case INT8:
+              row.addByte(name, (byte) value);
+              break;
+            case INT16:
+              row.addShort(name, (short) value);
+              break;
+            case INT32:
+              row.addInt(name, (int) value);
+              break;
+            case INT64: // Fall through
+            case TIMESTAMP:
+              row.addLong(name, (long) value);
+              break;
+            case FLOAT:
+              row.addFloat(name, (float) value);
+              break;
+            case DOUBLE:
+              row.addDouble(name, (double) value);
+              break;
+            case STRING:
+              row.addString(name, value.toString());
+              break;
+            case BINARY:
+              row.addBinary(name, (byte[]) value);
+              break;
+            default:
+              throw new FlumeException(String.format(
+                  "Unrecognized type %s for column %s", col.getType().toString(), name));
+          }
+        } catch (ClassCastException e) {
+          throw new FlumeException(
+              String.format("Failed to coerce value for column '%s' to type %s",
+              col.getName(),
+              col.getType()));
+        }
+      }
+    }
+  }
+
+  private Schema getSchema(Event event) throws FlumeException {
+    Map<String, String> headers = event.getHeaders();
+    String schemaURL = headers.get(SCHEMA_URL_HEADER);
+    String schemaLiteral = headers.get(SCHEMA_LITERAL_HEADER);
+    try {
+      if (schemaURL != null) {
+        return schemasFromURL.get(schemaURL);
+      } else if (schemaLiteral != null) {
+        return schemasFromLiteral.get(schemaLiteral);
+      } else if (defaultSchemaURL != null) {
+        return schemasFromURL.get(defaultSchemaURL);
+      } else {
+        throw new FlumeException(
+            String.format("No schema for event. " +
+                  "Specify configuration property '%s' or event header '%s'",
+                SCHEMA_PROP,
+                SCHEMA_URL_HEADER));
+      }
+    } catch (ExecutionException ex) {
+      throw new FlumeException("Cannot get schema", ex);
+    } catch (UncheckedExecutionException ex) {
+      throw new FlumeException("Cannot parse schema", ex);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java
deleted file mode 100644
index 95f63b1..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.kudu.flume.sink;
-
-import org.apache.flume.Event;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.conf.ConfigurableComponent;
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.annotations.InterfaceStability;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-
-import java.util.List;
-
-/**
- * Interface for an event producer which produces Kudu Operations to write
- * the headers and body of an event in a Kudu table. This is configurable,
- * so any config params required should be taken through this. The columns
- * should exist in the table specified in the configuration for the KuduSink.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface KuduEventProducer extends Configurable, ConfigurableComponent {
-  /**
-   * Initialize the event producer.
-   * @param event to be written to Kudu
-   * @param table the KuduTable object used for creating Kudu Operation objects
-   */
-  void initialize(Event event, KuduTable table);
-
-  /**
-   * Get the operations that should be written out to Kudu as a result of this
-   * event. This list is written to Kudu using the Kudu client API.
-   * @return List of {@link org.apache.kudu.client.Operation} which
-   * are written as such to Kudu
-   */
-  List<Operation> getOperations();
-
-  /*
-   * Clean up any state. This will be called when the sink is being stopped.
-   */
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java
new file mode 100644
index 0000000..8816e95
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kudu.flume.sink;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+
+import java.util.List;
+
+/**
+ * Interface for an operations producer that produces Kudu Operations from
+ * Flume events.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface KuduOperationsProducer extends Configurable, AutoCloseable {
+  /**
+   * Initializes the operations producer. Called between configure and
+   * getOperations.
+   * @param table the KuduTable used to create Kudu Operation objects
+   */
+  void initialize(KuduTable table);
+
+  /**
+   * Returns the operations that should be written to Kudu as a result of this
+   * event.
+   * @return List of {@link org.apache.kudu.client.Operation} that
+   * should be written to Kudu
+   */
+  List<Operation> getOperations(Event event);
+
+  /**
+   * Cleans up any state. Called when the sink is stopped.
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
index 8c206d8..b80462c 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
@@ -18,6 +18,13 @@
  */
 package org.apache.kudu.flume.sink;
 
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.BATCH_SIZE;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TIMEOUT_MILLIS;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -53,13 +60,13 @@ import java.util.List;
  * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
  * <tr><td>channel</td><td></td><td>Yes</td><td>The name of the Flume channel to read from.</td></tr>
  * <tr><td>type</td><td></td><td>Yes</td><td>Component name. Must be {@code org.apache.kudu.flume.sink.KuduSink}</td></tr>
- * <tr><td>masterAddresses</td><td></td><td>Yes</td><td>Comma-separated list of "host:port" pairs of the Kudu master servers. The port is optional.</td></tr>
+ * <tr><td>masterAddresses</td><td></td><td>Yes</td><td>Comma-separated list of "host:port" Kudu master addresses. The port is optional.</td></tr>
  * <tr><td>tableName</td><td></td><td>Yes</td><td>The name of the Kudu table to write to.</td></tr>
  * <tr><td>batchSize</td><td>100</td><td>No</td><td>The maximum number of events the sink will attempt to take from the channel per transaction.</td></tr>
- * <tr><td>ignoreDuplicateRows</td><td>true</td><td>No</td><td>Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu.</td></tr>
+ * <tr><td>ignoreDuplicateRows</td><td>true</td><td>No</td><td>Whether to ignore duplicate primary key errors caused by inserts.</td></tr>
  * <tr><td>timeoutMillis</td><td>10000</td><td>No</td><td>Timeout period for Kudu write operations, in milliseconds.</td></tr>
- * <tr><td>producer</td><td>{@link org.apache.kudu.flume.sink.SimpleKuduEventProducer}</td><td>No</td><td>The fully qualified class name of the {@link KuduEventProducer} the sink should use.</td></tr>
- * <tr><td>producer.*</td><td></td><td>(Varies by event producer)</td><td>Configuration properties to pass to the event producer implementation.</td></tr>
+ * <tr><td>producer</td><td>{@link SimpleKuduOperationsProducer}</td><td>No</td><td>The fully qualified class name of the {@link KuduOperationsProducer} the sink should use.</td></tr>
+ * <tr><td>producer.*</td><td></td><td>(Varies by operations producer)</td><td>Configuration properties to pass to the operations producer implementation.</td></tr>
  * </table>
  *
  * <p><strong>Installation</strong>
@@ -78,8 +85,8 @@ public class KuduSink extends AbstractSink implements Configurable {
   private static final Long DEFAULT_BATCH_SIZE = 100L;
   private static final Long DEFAULT_TIMEOUT_MILLIS =
           AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
-  private static final String DEFAULT_KUDU_EVENT_PRODUCER =
-          "org.apache.kudu.flume.sink.SimpleKuduEventProducer";
+  private static final String DEFAULT_KUDU_OPERATION_PRODUCER =
+          "org.apache.kudu.flume.sink.SimpleKuduOperationsProducer";
   private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
 
   private String masterAddresses;
@@ -90,9 +97,7 @@ public class KuduSink extends AbstractSink implements Configurable {
   private KuduTable table;
   private KuduSession session;
   private KuduClient client;
-  private KuduEventProducer eventProducer;
-  private String eventProducerType;
-  private Context producerContext;
+  private KuduOperationsProducer operationsProducer;
   private SinkCounter sinkCounter;
 
   public KuduSink() {
@@ -107,10 +112,10 @@ public class KuduSink extends AbstractSink implements Configurable {
 
   @Override
   public void start() {
-    Preconditions.checkState(table == null && session == null, "Please call stop " +
-        "before calling start on an old instance.");
+    Preconditions.checkState(table == null && session == null,
+        "Please call stop before calling start on an old instance.");
 
-    // This is not null only inside tests
+    // client is not null only inside tests
     if (client == null) {
       client = new KuduClient.KuduClientBuilder(masterAddresses).build();
     }
@@ -123,10 +128,11 @@ public class KuduSink extends AbstractSink implements Configurable {
       table = client.openTable(tableName);
     } catch (Exception e) {
       sinkCounter.incrementConnectionFailedCount();
-      String msg = String.format("Could not open table '%s' from Kudu", tableName);
+      String msg = String.format("Could not open Kudu table '%s'", tableName);
       logger.error(msg, e);
       throw new FlumeException(msg, e);
     }
+    operationsProducer.initialize(table);
 
     super.start();
     sinkCounter.incrementConnectionCreatedCount();
@@ -135,6 +141,13 @@ public class KuduSink extends AbstractSink implements Configurable {
 
   @Override
   public void stop() {
+    Exception ex = null;
+    try {
+      operationsProducer.close();
+    } catch (Exception e) {
+      ex = e;
+      logger.error("Error closing operations producer", e);
+    }
     try {
       if (client != null) {
         client.shutdown();
@@ -143,53 +156,52 @@ public class KuduSink extends AbstractSink implements Configurable {
       table = null;
       session = null;
     } catch (Exception e) {
-      throw new FlumeException("Error closing client.", e);
+      ex = e;
+      logger.error("Error closing client", e);
     }
     sinkCounter.incrementConnectionClosedCount();
     sinkCounter.stop();
+    if (ex != null) {
+      throw new FlumeException("Error stopping sink", ex);
+    }
   }
 
   @SuppressWarnings("unchecked")
   @Override
   public void configure(Context context) {
-    masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
-    tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
-
-    batchSize = context.getLong(
-            KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE);
-    timeoutMillis = context.getLong(
-            KuduSinkConfigurationConstants.TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
-    ignoreDuplicateRows = context.getBoolean(
-            KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
-    eventProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER);
-
+    masterAddresses = context.getString(MASTER_ADDRESSES);
     Preconditions.checkNotNull(masterAddresses,
-        "Master address cannot be empty, please specify '" +
-                KuduSinkConfigurationConstants.MASTER_ADDRESSES +
-                "' in configuration file");
+        "Missing master addresses. Please specify property '$s'.",
+        MASTER_ADDRESSES);
+
+    tableName = context.getString(TABLE_NAME);
     Preconditions.checkNotNull(tableName,
-        "Table name cannot be empty, please specify '" +
-                KuduSinkConfigurationConstants.TABLE_NAME +
-                "' in configuration file");
-
-    // Check for event producer, if null set event producer type.
-    if (eventProducerType == null || eventProducerType.isEmpty()) {
-      eventProducerType = DEFAULT_KUDU_EVENT_PRODUCER;
-      logger.info("No Kudu event producer defined, will use default");
+        "Missing table name. Please specify property '%s'",
+        TABLE_NAME);
+
+    batchSize = context.getLong(BATCH_SIZE, DEFAULT_BATCH_SIZE);
+    timeoutMillis = context.getLong(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
+    ignoreDuplicateRows = context.getBoolean(IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
+    String operationProducerType = context.getString(PRODUCER);
+
+    // Check for operations producer, if null set default operations producer type.
+    if (operationProducerType == null || operationProducerType.isEmpty()) {
+      operationProducerType = DEFAULT_KUDU_OPERATION_PRODUCER;
+      logger.warn("No Kudu operations producer provided, using default");
     }
 
-    producerContext = new Context();
+    Context producerContext = new Context();
     producerContext.putAll(context.getSubProperties(
             KuduSinkConfigurationConstants.PRODUCER_PREFIX));
 
     try {
-      Class<? extends KuduEventProducer> clazz =
-          (Class<? extends KuduEventProducer>)
-          Class.forName(eventProducerType);
-      eventProducer = clazz.newInstance();
-      eventProducer.configure(producerContext);
+      Class<? extends KuduOperationsProducer> clazz =
+          (Class<? extends KuduOperationsProducer>)
+          Class.forName(operationProducerType);
+      operationsProducer = clazz.newInstance();
+      operationsProducer.configure(producerContext);
     } catch (Exception e) {
-      logger.error("Could not instantiate Kudu event producer." , e);
+      logger.error("Could not instantiate Kudu operations producer" , e);
       Throwables.propagate(e);
     }
     sinkCounter = new SinkCounter(this.getName());
@@ -202,9 +214,9 @@ public class KuduSink extends AbstractSink implements Configurable {
   @Override
   public Status process() throws EventDeliveryException {
     if (session.hasPendingOperations()) {
-      // If for whatever reason we have pending operations then just refuse to process
-      // and tell caller to try again a bit later. We don't want to pile on the kudu
-      // session object.
+      // If for whatever reason we have pending operations, refuse to process
+      // more and tell the caller to try again a bit later. We don't want to
+      // pile on the KuduSession.
       return Status.BACKOFF;
     }
 
@@ -221,8 +233,7 @@ public class KuduSink extends AbstractSink implements Configurable {
           break;
         }
 
-        eventProducer.initialize(event, table);
-        List<Operation> operations = eventProducer.getOperations();
+        List<Operation> operations = operationsProducer.getOperations(event);
         for (Operation o : operations) {
           session.apply(o);
         }
@@ -280,10 +291,4 @@ public class KuduSink extends AbstractSink implements Configurable {
 
     return Status.BACKOFF;
   }
-
-  @VisibleForTesting
-  @InterfaceAudience.Private
-  KuduEventProducer getEventProducer() {
-    return eventProducer;
-  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
index 7da815e..e5f7342 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
@@ -22,15 +22,13 @@ package org.apache.kudu.flume.sink;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
 
-/**
- * Constants used for configuration of KuduSink
- */
-
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class KuduSinkConfigurationConstants {
   /**
-   * Comma-separated list of "host:port" pairs of the masters (port optional).
+   * Comma-separated list of "host:port" Kudu master addresses.
+   * The port is optional and defaults to the Kudu Java client's default master
+   * port.
    */
   public static final String MASTER_ADDRESSES = "masterAddresses";
 
@@ -40,18 +38,20 @@ public class KuduSinkConfigurationConstants {
   public static final String TABLE_NAME = "tableName";
 
   /**
-   * The fully qualified class name of the Kudu event producer the sink should use.
+   * The fully qualified class name of the KuduOperationsProducer class that the
+   * sink should use.
    */
   public static final String PRODUCER = "producer";
 
   /**
-   * Configuration to pass to the Kudu event producer.
+   * Prefix for configuration parameters that are passed to the
+   * KuduOperationsProducer.
    */
   public static final String PRODUCER_PREFIX = PRODUCER + ".";
 
   /**
-   * Maximum number of events the sink should take from the channel per
-   * transaction, if available.
+   * Maximum number of events that the sink should take from the channel per
+   * transaction.
    */
   public static final String BATCH_SIZE = "batchSize";
 
@@ -61,7 +61,7 @@ public class KuduSinkConfigurationConstants {
   public static final String TIMEOUT_MILLIS = "timeoutMillis";
 
   /**
-   * Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu.
+   * Whether to ignore duplicate primary key errors caused by inserts.
    */
   public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java
deleted file mode 100644
index 534fd33..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.flume.conf.ComponentConfiguration;
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.Upsert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * <p>A simple serializer that generates one {@link Insert} or {@link Upsert} per {@link Event} by writing the event
- * body into a BINARY column. The pair (key column name, key column value) should be a header in the {@link Event};
- * the column name is configurable but the column type must be STRING. Multiple key columns are not supported.</p>
- *
- * <p><strong>Simple Keyed Kudu Event Producer configuration parameters</strong></p>
- *
- * <table cellpadding=3 cellspacing=0 border=1>
- * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
- * <tr><td>producer.payloadColumn</td><td>payload</td><td>No</td><td>The name of the BINARY column to write the Flume event body to.</td></tr>
- * <tr><td>producer.keyColumn</td><td>key</td><td>No</td><td>The name of the STRING key column of the target Kudu table.</td></tr>
- * <tr><td>producer.upsert</td><td>false</td><td>No</td><td>Whether to insert or upsert events.</td></tr>
- * </table>
- */
-public class SimpleKeyedKuduEventProducer implements KuduEventProducer {
-  private byte[] payload;
-  private String key;
-  private KuduTable table;
-  private String payloadColumn;
-  private String keyColumn;
-  private boolean upsert;
-
-  public SimpleKeyedKuduEventProducer(){
-  }
-
-  @Override
-  public void configure(Context context) {
-    payloadColumn = context.getString("payloadColumn","payload");
-    keyColumn = context.getString("keyColumn", "key");
-    upsert = context.getBoolean("upsert", false);
-  }
-
-  @Override
-  public void configure(ComponentConfiguration conf) {
-  }
-
-  @Override
-  public void initialize(Event event, KuduTable table) {
-    this.payload = event.getBody();
-    this.key = event.getHeaders().get(keyColumn);
-    this.table = table;
-  }
-
-  @Override
-  public List<Operation> getOperations() throws FlumeException {
-    try {
-      Operation op = (upsert) ? table.newUpsert() : table.newInsert();
-      PartialRow row = op.getRow();
-      row.addString(keyColumn, key);
-      row.addBinary(payloadColumn, payload);
-
-      return Collections.singletonList(op);
-    } catch (Exception e){
-      throw new FlumeException("Failed to create Kudu Operation object!", e);
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java
new file mode 100644
index 0000000..8360df8
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.flume.sink;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * <p>A simple serializer that generates one {@link Insert} or {@link Upsert}
+ * per {@link Event} by writing the event body into a BINARY column. The pair
+ * (key column name, key column value) should be a header in the {@link Event};
+ * the column name is configurable but the column type must be STRING. Multiple
+ * key columns are not supported.
+ *
+ * <p><strong>Simple Keyed Kudu Operations Producer configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1>
+ * <tr>
+ *   <th>Property Name</th>
+ *   <th>Default</th>
+ *   <th>Required?</th>
+ *   <th>Description</th>
+ * </tr>
+ * <tr>
+ *   <td>producer.payloadColumn</td>
+ *   <td>payload</td>
+ *   <td>No</td>
+ *   <td>The name of the BINARY column to write the Flume event body to.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.keyColumn</td>
+ *   <td>key</td>
+ *   <td>No</td>
+ *   <td>The name of the STRING key column of the target Kudu table.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.operation</td>
+ *   <td>upsert</td>
+ *   <td>No</td>
+ *   <td>The operation used to write events to Kudu. Supported operations
+ *   are 'insert' and 'upsert'</td>
+ * </tr>
+ * </table>
+ */
+public class SimpleKeyedKuduOperationsProducer implements KuduOperationsProducer {
+  public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
+  public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
+  public static final String KEY_COLUMN_PROP = "keyColumn";
+  public static final String KEY_COLUMN_DEFAULT = "key";
+  public static final String OPERATION_PROP = "operation";
+  public static final String OPERATION_DEFAULT = "upsert";
+
+  private KuduTable table;
+  private String payloadColumn;
+  private String keyColumn;
+  private String operation;
+
+  public SimpleKeyedKuduOperationsProducer(){
+  }
+
+  @Override
+  public void configure(Context context) {
+    payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
+    keyColumn = context.getString(KEY_COLUMN_PROP, KEY_COLUMN_DEFAULT);
+    operation = context.getString(OPERATION_PROP, OPERATION_DEFAULT);
+  }
+
+  @Override
+  public void initialize(KuduTable table) {
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations(Event event) throws FlumeException {
+    String key = event.getHeaders().get(keyColumn);
+    if (key == null) {
+      throw new FlumeException(
+          String.format("No value provided for key column %s", keyColumn));
+    }
+    try {
+      Operation op;
+      switch (operation.toLowerCase()) {
+        case "upsert":
+          op = table.newUpsert();
+          break;
+        case "insert":
+          op = table.newInsert();
+          break;
+        default:
+          throw new FlumeException(
+              String.format("Unexpected operation %s", operation));
+      }
+      PartialRow row = op.getRow();
+      row.addString(keyColumn, key);
+      row.addBinary(payloadColumn, event.getBody());
+
+      return Collections.singletonList(op);
+    } catch (Exception e){
+      throw new FlumeException("Failed to create Kudu Operation object", e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
deleted file mode 100644
index 2faf1a1..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.flume.conf.ComponentConfiguration;
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * <p>A simple serializer that generates one {@link Insert} per {@link Event} by writing the event
- * body into a BINARY column. The headers are discarded.</p>
- *
- * <p><strong>Simple Kudu Event Producer configuration parameters</strong></p>
- *
- * <table cellpadding=3 cellspacing=0 border=1>
- * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
- * <tr><td>producer.payloadColumn</td><td>payload</td><td>No</td><td>The name of the BINARY column to write the Flume the event body to.</td></tr>
- * </table>
- */
-public class SimpleKuduEventProducer implements KuduEventProducer {
-  private byte[] payload;
-  private KuduTable table;
-  private String payloadColumn;
-
-  public SimpleKuduEventProducer(){
-  }
-
-  @Override
-  public void configure(Context context) {
-    payloadColumn = context.getString("payloadColumn","payload");
-  }
-
-  @Override
-  public void configure(ComponentConfiguration conf) {
-  }
-
-  @Override
-  public void initialize(Event event, KuduTable table) {
-    this.payload = event.getBody();
-    this.table = table;
-  }
-
-  @Override
-  public List<Operation> getOperations() throws FlumeException {
-    try {
-      Insert insert = table.newInsert();
-      PartialRow row = insert.getRow();
-      row.addBinary(payloadColumn, payload);
-
-      return Collections.singletonList((Operation) insert);
-    } catch (Exception e){
-      throw new FlumeException("Failed to create Kudu Insert object!", e);
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java
new file mode 100644
index 0000000..f5f5838
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.flume.sink;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * <p>A simple serializer that generates one {@link Insert} per {@link Event}
+ * by writing the event body into a BINARY column. The headers are discarded.
+ *
+ * <p><strong>Simple Kudu Event Producer configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1>
+ * <tr>
+ *   <th>Property Name</th>
+ *   <th>Default</th>
+ *   <th>Required?</th>
+ *   <th>Description</th>
+ * </tr>
+ * <tr>
+ *   <td>producer.payloadColumn</td>
+ *   <td>payload</td>
+ *   <td>No</td>
+ *   <td>The name of the BINARY column to write the Flume the event body to.</td>
+ * </tr>
+ * </table>
+ */
+public class SimpleKuduOperationsProducer implements KuduOperationsProducer {
+  public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
+  public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
+
+  private KuduTable table;
+  private String payloadColumn;
+
+  public SimpleKuduOperationsProducer(){
+  }
+
+  @Override
+  public void configure(Context context) {
+    payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
+  }
+
+  @Override
+  public void initialize(KuduTable table) {
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations(Event event) throws FlumeException {
+    try {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addBinary(payloadColumn, event.getBody());
+
+      return Collections.singletonList((Operation) insert);
+    } catch (Exception e){
+      throw new FlumeException("Failed to create Kudu Insert object", e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc b/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc
deleted file mode 100644
index d86955c..0000000
--- a/java/kudu-flume-sink/src/test/avro/testAvroEventProducer.avsc
+++ /dev/null
@@ -1,11 +0,0 @@
-{"namespace": "org.apache.kudu.flume.sink",
-  "type": "record",
-  "name": "AvroKuduEventProducerTestRecord",
-  "fields": [
-    {"name": "key", "type": "int"},
-    {"name": "longField",  "type": "long"},
-    {"name": "doubleField",  "type": "double"},
-    {"name": "nullableField",  "type": ["string", "null"]},
-    {"name": "stringField", "type": "string"}
-  ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc b/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc
new file mode 100644
index 0000000..b562c3a
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/avro/testAvroKuduOperationsProducer.avsc
@@ -0,0 +1,11 @@
+{"namespace": "org.apache.kudu.flume.sink",
+  "type": "record",
+  "name": "AvroKuduOperationsProducerTestRecord",
+  "fields": [
+    {"name": "key", "type": "int"},
+    {"name": "longField",  "type": "long"},
+    {"name": "doubleField",  "type": "double"},
+    {"name": "nullableField",  "type": ["string", "null"]},
+    {"name": "stringField", "type": "string"}
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java
deleted file mode 100644
index c1de448..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduEventProducerTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import static org.apache.kudu.flume.sink.AvroKuduEventProducer.SCHEMA_LITERAL_HEADER;
-import static org.apache.kudu.flume.sink.AvroKuduEventProducer.SCHEMA_PROP;
-import static org.apache.kudu.flume.sink.AvroKuduEventProducer.SCHEMA_URL_HEADER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.BaseKuduTest;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class AvroKuduEventProducerTest extends BaseKuduTest {
-  private static final String schemaPath = "src/test/avro/testAvroEventProducer.avsc";
-  private static String schemaLiteral;
-
-  private static org.apache.avro.Schema schema;
-
-  enum SchemaLocation {
-    GLOBAL, URL, LITERAL
-  }
-
-  @BeforeClass
-  public static void setupAvroSchemaBeforeClass() {
-    org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
-    try {
-      schemaLiteral = Files.toString(new File(schemaPath), Charsets.UTF_8);
-      schema = parser.parse(new File(schemaPath));
-    } catch (IOException e) {
-      throw new FlumeException("Unable to open and parse schema file!", e);
-    }
-  }
-
-  @Test
-  public void testEmptyChannel() throws Exception {
-    testEvents(0, SchemaLocation.GLOBAL);
-  }
-
-  @Test
-  public void testOneEvent() throws Exception {
-    testEvents(1, SchemaLocation.GLOBAL);
-  }
-
-  @Test
-  public void testThreeEvents() throws Exception {
-    testEvents(3, SchemaLocation.GLOBAL);
-  }
-
-  @Test
-  public void testThreeEventsSchemaURLInEvent() throws Exception {
-    testEvents(3, SchemaLocation.URL);
-  }
-
-  @Test
-  public void testThreeEventsSchemaLiteralInEvent() throws Exception {
-    testEvents(3, SchemaLocation.LITERAL);
-  }
-
-  private void testEvents(int eventCount, SchemaLocation schemaLocation)
-      throws Exception {
-    KuduTable table = createNewTable(
-        String.format("test%sevents%s", eventCount, schemaLocation));
-    String tableName = table.getName();
-    String schemaURI = new File(schemaPath).getAbsoluteFile().toURI().toString();
-    Context ctx = schemaLocation != SchemaLocation.GLOBAL ? new Context()
-        : new Context(ImmutableMap.of(PRODUCER_PREFIX + SCHEMA_PROP, schemaURI));
-    KuduSink sink = createSink(tableName, ctx);
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-    sink.start();
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-    writeEventsToChannel(channel, eventCount, schemaLocation);
-    tx.commit();
-    tx.close();
-
-    Sink.Status status = sink.process();
-    if (eventCount == 0) {
-      assertEquals("incorrect status for empty channel", status, Sink.Status.BACKOFF);
-    } else {
-      assertEquals("incorrect status for non-empty channel", status, Sink.Status.READY);
-    }
-
-    List<String> answers = makeAnswers(eventCount);
-    List<String> rows = scanTableToStrings(table);
-    assertEquals("wrong number of rows inserted", answers.size(), rows.size());
-    assertArrayEquals("wrong rows inserted", answers.toArray(), rows.toArray());
-  }
-
-  private KuduTable createNewTable(String tableName) throws Exception {
-    ArrayList<ColumnSchema> columns = new ArrayList<>(5);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING).nullable(true).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build());
-    CreateTableOptions createOptions =
-        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
-            .setNumReplicas(1);
-    return createTable(tableName, new Schema(columns), createOptions);
-  }
-
-  private KuduSink createSink(String tableName, Context ctx) {
-    KuduSink sink = new KuduSink(syncClient);
-    HashMap<String, String> parameters = new HashMap<>();
-    parameters.put(TABLE_NAME, tableName);
-    parameters.put(MASTER_ADDRESSES, getMasterAddresses());
-    parameters.put(PRODUCER,
-        "org.apache.kudu.flume.sink.AvroKuduEventProducer");
-    Context context = new Context(parameters);
-    context.putAll(ctx.getParameters());
-    Configurables.configure(sink, context);
-
-    return sink;
-  }
-
-  private void writeEventsToChannel(Channel channel, int eventCount,
-                                    SchemaLocation schemaLocation) throws Exception {
-    for (int i = 0; i < eventCount; i++) {
-      AvroKuduEventProducerTestRecord record = new AvroKuduEventProducerTestRecord();
-      record.setKey(10 * i);
-      record.setLongField(2L * i);
-      record.setDoubleField(2.71828 * i);
-      record.setNullableField(i % 2 == 0 ? null : "taco");
-      record.setStringField(String.format("hello %d", i));
-      ByteArrayOutputStream out = new ByteArrayOutputStream();
-      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
-      DatumWriter<AvroKuduEventProducerTestRecord> writer =
-          new SpecificDatumWriter<>(AvroKuduEventProducerTestRecord.class);
-      writer.write(record, encoder);
-      encoder.flush();
-      Event e = EventBuilder.withBody(out.toByteArray());
-      if (schemaLocation == SchemaLocation.URL) {
-        String schemaURI = new File(schemaPath).getAbsoluteFile().toURI().toString();
-        e.setHeaders(ImmutableMap.of(SCHEMA_URL_HEADER, schemaURI));
-      } else if (schemaLocation == SchemaLocation.LITERAL) {
-        e.setHeaders(ImmutableMap.of(SCHEMA_LITERAL_HEADER, schemaLiteral));
-      }
-      channel.put(e);
-    }
-  }
-
-  private List<String> makeAnswers(int eventCount) {
-    List<String> answers = Lists.newArrayList();
-    for (int i = 0; i < eventCount; i++) {
-      answers.add(String.format(
-          "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " +
-              "STRING nullableField=%s, STRING stringField=hello %s",
-          10 * i,
-          2 * i,
-          2.71828 * i,
-          i % 2 == 0 ? "NULL" : "taco",
-          i));
-    }
-    Collections.sort(answers);
-    return answers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
new file mode 100644
index 0000000..2465428
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.flume.sink;
+
+import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_LITERAL_HEADER;
+import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_PROP;
+import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_URL_HEADER;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AvroKuduOperationsProducerTest extends BaseKuduTest {
+  private static final String schemaPath = "src/test/avro/testAvroKuduOperationsProducer.avsc";
+  private static String schemaLiteral;
+
+  enum SchemaLocation {
+    GLOBAL, URL, LITERAL
+  }
+
+  @BeforeClass
+  public static void setupAvroSchemaBeforeClass() {
+    try {
+      schemaLiteral = Files.toString(new File(schemaPath), Charsets.UTF_8);
+    } catch (IOException e) {
+      throw new FlumeException("Unable to read schema file!", e);
+    }
+  }
+
+  @Test
+  public void testEmptyChannel() throws Exception {
+    testEvents(0, SchemaLocation.GLOBAL);
+  }
+
+  @Test
+  public void testOneEvent() throws Exception {
+    testEvents(1, SchemaLocation.GLOBAL);
+  }
+
+  @Test
+  public void testThreeEvents() throws Exception {
+    testEvents(3, SchemaLocation.GLOBAL);
+  }
+
+  @Test
+  public void testThreeEventsSchemaURLInEvent() throws Exception {
+    testEvents(3, SchemaLocation.URL);
+  }
+
+  @Test
+  public void testThreeEventsSchemaLiteralInEvent() throws Exception {
+    testEvents(3, SchemaLocation.LITERAL);
+  }
+
+  private void testEvents(int eventCount, SchemaLocation schemaLocation)
+      throws Exception {
+    KuduTable table = createNewTable(
+        String.format("test%sevents%s", eventCount, schemaLocation));
+    String tableName = table.getName();
+    String schemaURI = new File(schemaPath).getAbsoluteFile().toURI().toString();
+    Context ctx = schemaLocation != SchemaLocation.GLOBAL ? new Context()
+        : new Context(ImmutableMap.of(PRODUCER_PREFIX + SCHEMA_PROP, schemaURI));
+    KuduSink sink = createSink(tableName, ctx);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    writeEventsToChannel(channel, eventCount, schemaLocation);
+    tx.commit();
+    tx.close();
+
+    Sink.Status status = sink.process();
+    if (eventCount == 0) {
+      assertEquals("incorrect status for empty channel", status, Sink.Status.BACKOFF);
+    } else {
+      assertEquals("incorrect status for non-empty channel", status, Sink.Status.READY);
+    }
+
+    List<String> answers = makeAnswers(eventCount);
+    List<String> rows = scanTableToStrings(table);
+    assertEquals("wrong number of rows inserted", answers.size(), rows.size());
+    assertArrayEquals("wrong rows inserted", answers.toArray(), rows.toArray());
+  }
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(5);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING).nullable(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
+            .setNumReplicas(1);
+    return createTable(tableName, new Schema(columns), createOptions);
+  }
+
+  private KuduSink createSink(String tableName, Context ctx) {
+    KuduSink sink = new KuduSink(syncClient);
+    HashMap<String, String> parameters = new HashMap<>();
+    parameters.put(TABLE_NAME, tableName);
+    parameters.put(MASTER_ADDRESSES, getMasterAddresses());
+    parameters.put(PRODUCER, AvroKuduOperationsProducer.class.getName());
+    Context context = new Context(parameters);
+    context.putAll(ctx.getParameters());
+    Configurables.configure(sink, context);
+
+    return sink;
+  }
+
+  private void writeEventsToChannel(Channel channel, int eventCount,
+                                    SchemaLocation schemaLocation) throws Exception {
+    for (int i = 0; i < eventCount; i++) {
+      AvroKuduOperationsProducerTestRecord record = new AvroKuduOperationsProducerTestRecord();
+      record.setKey(10 * i);
+      record.setLongField(2L * i);
+      record.setDoubleField(2.71828 * i);
+      record.setNullableField(i % 2 == 0 ? null : "taco");
+      record.setStringField(String.format("hello %d", i));
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+      DatumWriter<AvroKuduOperationsProducerTestRecord> writer =
+          new SpecificDatumWriter<>(AvroKuduOperationsProducerTestRecord.class);
+      writer.write(record, encoder);
+      encoder.flush();
+      Event e = EventBuilder.withBody(out.toByteArray());
+      if (schemaLocation == SchemaLocation.URL) {
+        String schemaURI = new File(schemaPath).getAbsoluteFile().toURI().toString();
+        e.setHeaders(ImmutableMap.of(SCHEMA_URL_HEADER, schemaURI));
+      } else if (schemaLocation == SchemaLocation.LITERAL) {
+        e.setHeaders(ImmutableMap.of(SCHEMA_LITERAL_HEADER, schemaLiteral));
+      }
+      channel.put(e);
+    }
+  }
+
+  private List<String> makeAnswers(int eventCount) {
+    List<String> answers = Lists.newArrayList();
+    for (int i = 0; i < eventCount; i++) {
+      answers.add(String.format(
+          "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " +
+              "STRING nullableField=%s, STRING stringField=hello %s",
+          10 * i,
+          2 * i,
+          2.71828 * i,
+          i % 2 == 0 ? "NULL" : "taco",
+          i));
+    }
+    Collections.sort(answers);
+    return answers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/fdfcdba2/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java
deleted file mode 100644
index bf56ca3..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.kudu.flume.sink;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.BaseKuduTest;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class KeyedKuduEventProducerTest extends BaseKuduTest {
-  private static final Logger LOG = LoggerFactory.getLogger(KeyedKuduEventProducerTest.class);
-
-  private KuduTable createNewTable(String tableName) throws Exception {
-    LOG.info("Creating new table...");
-
-    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(false).build());
-    CreateTableOptions createOptions =
-      new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
-                              .setNumReplicas(1);
-    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
-
-    LOG.info("Created new table.");
-
-    return table;
-  }
-
-  @Test
-  public void testEmptyChannelWithInsert() throws Exception {
-    testEvents(0, "false");
-  }
-
-  @Test
-  public void testOneEventWithInsert() throws Exception {
-    testEvents(1, "false");
-  }
-
-  @Test
-  public void testThreeEventsWithInsert() throws Exception {
-    testEvents(3, "false");
-  }
-
-  @Test
-  public void testEmptyChannelWithUpsert() throws Exception {
-    testEvents(0, "true");
-  }
-
-  @Test
-  public void testOneEventWithUpsert() throws Exception {
-    testEvents(1, "true");
-  }
-
-  @Test
-  public void testThreeEventsWithUpsert() throws Exception {
-    testEvents(3, "true");
-  }
-
-  @Test
-  public void testDuplicateRowsWithUpsert() throws Exception {
-    LOG.info("Testing events with upsert...");
-
-    KuduTable table = createNewTable("testDupUpsertEvents");
-    String tableName = table.getName();
-    Context ctx = new Context(ImmutableMap.of("producer.upsert", "true"));
-    KuduSink sink = createSink(tableName, ctx);
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-    sink.start();
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-
-    int numRows = 3;
-    for (int i = 0; i < numRows; i++) {
-      Event e = EventBuilder.withBody(String.format("payload body %s", i), Charsets.UTF_8);
-      e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
-      channel.put(e);
-    }
-
-    tx.commit();
-    tx.close();
-
-    Sink.Status status = sink.process();
-    assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
-
-    List<String> rows = scanTableToStrings(table);
-    assertEquals(numRows + " row(s) expected", numRows, rows.size());
-
-    for (int i = 0; i < numRows; i++) {
-      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
-    }
-
-    Transaction utx = channel.getTransaction();
-    utx.begin();
-
-    Event dup = EventBuilder.withBody("payload body upserted".getBytes());
-    dup.setHeaders(ImmutableMap.of("key", String.format("key %s", 0)));
-    channel.put(dup);
-
-    utx.commit();
-    utx.close();
-
-    Sink.Status upStatus = sink.process();
-    assertTrue("incorrect status for non-empty channel", upStatus != Sink.Status.BACKOFF);
-
-    List<String> upRows = scanTableToStrings(table);
-    assertEquals(numRows + " row(s) expected", numRows, upRows.size());
-
-    assertTrue("incorrect payload", upRows.get(0).contains("payload body upserted"));
-    for (int i = 1; i < numRows; i++) {
-      assertTrue("incorrect payload", upRows.get(i).contains("payload body " + i));
-    }
-
-    LOG.info("Testing events with upsert finished successfully.");
-  }
-
-  private void testEvents(int eventCount, String upsert) throws Exception {
-    LOG.info("Testing {} events...", eventCount);
-
-    KuduTable table = createNewTable("test" + eventCount + "eventsUp" + upsert);
-    String tableName = table.getName();
-    Context ctx = new Context(ImmutableMap.of("producer.upsert", upsert));
-    KuduSink sink = createSink(tableName, ctx);
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-    sink.start();
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-
-    for (int i = 0; i < eventCount; i++) {
-      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes());
-      e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
-      channel.put(e);
-    }
-
-    tx.commit();
-    tx.close();
-
-    Sink.Status status = sink.process();
-    if (eventCount == 0) {
-      assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
-    } else {
-      assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
-    }
-
-    List<String> rows = scanTableToStrings(table);
-    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
-
-    for (int i = 0; i < eventCount; i++) {
-      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
-    }
-
-    LOG.info("Testing {} events finished successfully.", eventCount);
-  }
-
-  private KuduSink createSink(String tableName, Context ctx) {
-    LOG.info("Creating Kudu sink for '{}' table...", tableName);
-
-    KuduSink sink = new KuduSink(syncClient);
-    HashMap<String, String> parameters = new HashMap<>();
-    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
-    parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddresses());
-    parameters.put(KuduSinkConfigurationConstants.PRODUCER, "org.apache.kudu.flume.sink.SimpleKeyedKuduEventProducer");
-    Context context = new Context(parameters);
-    context.putAll(ctx.getParameters());
-    Configurables.configure(sink, context);
-
-    LOG.info("Created Kudu sink for '{}' table.", tableName);
-
-    return sink;
-  }
-}


Mime
View raw message