kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/2] kudu git commit: KUDU-1416 Upsert support for Flume sink
Date Tue, 02 Aug 2016 00:37:24 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 2ed1f37d4 -> 085ceab3e


KUDU-1416 Upsert support for Flume sink

This patch adds a new SimpleKeyedKuduEventProducer class that can upsert. The
original KuduEventProducer class, SimpleKuduEventProducer, assumed it was
inserting a binary payload to a key column, and thus was not compatible with
upserts. SimpleKeyedKuduEventProducer supports inserting or upserting a binary
payload with string key column.

Change-Id: Ibe5b5df70687103ed6916d58148336882aa66d85
Reviewed-on: http://gerrit.cloudera.org:8080/3157
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c9be2711
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c9be2711
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c9be2711

Branch: refs/heads/master
Commit: c9be27119c083d72d47903d44021c36436c963fc
Parents: 2ed1f37
Author: Will Berkeley <wdberkeley@gmail.com>
Authored: Fri May 20 17:08:15 2016 -0400
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue Aug 2 00:18:25 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/flume/sink/KuduSink.java    |   1 -
 .../sink/SimpleKeyedKuduEventProducer.java      |  96 ++++++++
 .../flume/sink/SimpleKuduEventProducer.java     |   4 +-
 .../flume/sink/KeyedKuduEventProducerTest.java  | 217 +++++++++++++++++++
 4 files changed, 315 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c9be2711/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
index 3066323..8c206d8 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
@@ -237,7 +237,6 @@ public class KuduSink extends AbstractSink implements Configurable {
           // into Kudu successfully but the Flume transaction is rolled back for some reason,
           // and a subsequent replay of the same Flume transaction leads to a
           // duplicate key error since the row already exists in Kudu.
-          // (Kudu doesn't support "insert or overwrite" semantics yet.)
           // Note: Duplicate keys will not be reported as errors if ignoreDuplicateRows
           // is enabled in the config.
           if (response.hasRowError()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/c9be2711/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java
b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java
new file mode 100644
index 0000000..534fd33
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduEventProducer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.flume.sink;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * <p>A simple serializer that generates one {@link Insert} or {@link Upsert} per {@link
Event} by writing the event
+ * body into a BINARY column. The pair (key column name, key column value) should be a header
in the {@link Event};
+ * the column name is configurable but the column type must be STRING. Multiple key columns
are not supported.</p>
+ *
+ * <p><strong>Simple Keyed Kudu Event Producer configuration parameters</strong></p>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1>
+ * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
+ * <tr><td>producer.payloadColumn</td><td>payload</td><td>No</td><td>The
name of the BINARY column to write the Flume event body to.</td></tr>
+ * <tr><td>producer.keyColumn</td><td>key</td><td>No</td><td>The
name of the STRING key column of the target Kudu table.</td></tr>
+ * <tr><td>producer.upsert</td><td>false</td><td>No</td><td>Whether
to insert or upsert events.</td></tr>
+ * </table>
+ */
+public class SimpleKeyedKuduEventProducer implements KuduEventProducer {
+  private byte[] payload;
+  private String key;
+  private KuduTable table;
+  private String payloadColumn;
+  private String keyColumn;
+  private boolean upsert;
+
+  public SimpleKeyedKuduEventProducer(){
+  }
+
+  @Override
+  public void configure(Context context) {
+    payloadColumn = context.getString("payloadColumn","payload");
+    keyColumn = context.getString("keyColumn", "key");
+    upsert = context.getBoolean("upsert", false);
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+
+  @Override
+  public void initialize(Event event, KuduTable table) {
+    this.payload = event.getBody();
+    this.key = event.getHeaders().get(keyColumn);
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations() throws FlumeException {
+    try {
+      Operation op = (upsert) ? table.newUpsert() : table.newInsert();
+      PartialRow row = op.getRow();
+      row.addString(keyColumn, key);
+      row.addBinary(payloadColumn, payload);
+
+      return Collections.singletonList(op);
+    } catch (Exception e){
+      throw new FlumeException("Failed to create Kudu Operation object!", e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kudu/blob/c9be2711/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
index 9eb07c4..2faf1a1 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
@@ -33,9 +33,9 @@ import java.util.List;
 
 /**
  * <p>A simple serializer that generates one {@link Insert} per {@link Event} by writing
the event
- * body into a BINARY column. The headers are discarded.
+ * body into a BINARY column. The headers are discarded.</p>
  *
- * <p><strong>Simple Kudu Event Producer configuration parameters</strong>
+ * <p><strong>Simple Kudu Event Producer configuration parameters</strong></p>
  *
  * <table cellpadding=3 cellspacing=0 border=1>
  * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>

http://git-wip-us.apache.org/repos/asf/kudu/blob/c9be2711/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java
b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java
new file mode 100644
index 0000000..51a0215
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduEventProducerTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kudu.flume.sink;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.shaded.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class KeyedKuduEventProducerTest extends BaseKuduTest {
+  private static final Logger LOG = LoggerFactory.getLogger(KeyedKuduEventProducerTest.class);
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    LOG.info("Creating new table...");
+
+    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(false).build());
+    CreateTableOptions createOptions =
+      new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
+                              .setNumReplicas(1);
+    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+
+    LOG.info("Created new table.");
+
+    return table;
+  }
+
+  @Test
+  public void testEmptyChannelWithInsert() throws Exception {
+    testEvents(0, "false");
+  }
+
+  @Test
+  public void testOneEventWithInsert() throws Exception {
+    testEvents(1, "false");
+  }
+
+  @Test
+  public void testThreeEventsWithInsert() throws Exception {
+    testEvents(3, "false");
+  }
+
+  @Test
+  public void testEmptyChannelWithUpsert() throws Exception {
+    testEvents(0, "true");
+  }
+
+  @Test
+  public void testOneEventWithUpsert() throws Exception {
+    testEvents(1, "true");
+  }
+
+  @Test
+  public void testThreeEventsWithUpsert() throws Exception {
+    testEvents(3, "true");
+  }
+
+  @Test
+  public void testDuplicateRowsWithUpsert() throws Exception {
+    LOG.info("Testing events with upsert...");
+
+    KuduTable table = createNewTable("testDupUpsertEvents");
+    String tableName = table.getName();
+    Context ctx = new Context(ImmutableMap.of("producer.upsert", "true"));
+    KuduSink sink = createSink(tableName, ctx);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    int numRows = 3;
+    for (int i = 0; i < numRows; i++) {
+      Event e = EventBuilder.withBody(String.format("payload body %s", i), Charsets.UTF_8);
+      e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
+      channel.put(e);
+    }
+
+    tx.commit();
+    tx.close();
+
+    Sink.Status status = sink.process();
+    assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(numRows + " row(s) expected", numRows, rows.size());
+
+    for (int i = 0; i < numRows; i++) {
+      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+    }
+
+    Transaction utx = channel.getTransaction();
+    utx.begin();
+
+    Event dup = EventBuilder.withBody("payload body upserted".getBytes());
+    dup.setHeaders(ImmutableMap.of("key", String.format("key %s", 0)));
+    channel.put(dup);
+
+    utx.commit();
+    utx.close();
+
+    Sink.Status upStatus = sink.process();
+    assertTrue("incorrect status for non-empty channel", upStatus != Sink.Status.BACKOFF);
+
+    List<String> upRows = scanTableToStrings(table);
+    assertEquals(numRows + " row(s) expected", numRows, upRows.size());
+
+    assertTrue("incorrect payload", upRows.get(0).contains("payload body upserted"));
+    for (int i = 1; i < numRows; i++) {
+      assertTrue("incorrect payload", upRows.get(i).contains("payload body " + i));
+    }
+
+    LOG.info("Testing events with upsert finished successfully.");
+  }
+
+  private void testEvents(int eventCount, String upsert) throws Exception {
+    LOG.info("Testing {} events...", eventCount);
+
+    KuduTable table = createNewTable("test" + eventCount + "eventsUp" + upsert);
+    String tableName = table.getName();
+    Context ctx = new Context(ImmutableMap.of("producer.upsert", upsert));
+    KuduSink sink = createSink(tableName, ctx);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    for (int i = 0; i < eventCount; i++) {
+      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes());
+      e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
+      channel.put(e);
+    }
+
+    tx.commit();
+    tx.close();
+
+    Sink.Status status = sink.process();
+    if (eventCount == 0) {
+      assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
+    } else {
+      assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
+    }
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+    for (int i = 0; i < eventCount; i++) {
+      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+    }
+
+    LOG.info("Testing {} events finished successfully.", eventCount);
+  }
+
+  private KuduSink createSink(String tableName, Context ctx) {
+    LOG.info("Creating Kudu sink for '{}' table...", tableName);
+
+    KuduSink sink = new KuduSink(syncClient);
+    HashMap<String, String> parameters = new HashMap<>();
+    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
+    parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddresses());
+    parameters.put(KuduSinkConfigurationConstants.PRODUCER, "org.apache.kudu.flume.sink.SimpleKeyedKuduEventProducer");
+    Context context = new Context(parameters);
+    context.putAll(ctx.getParameters());
+    Configurables.configure(sink, context);
+
+    LOG.info("Created Kudu sink for '{}' table.", tableName);
+
+    return sink;
+  }
+}


Mime
View raw message