kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [05/36] incubator-kudu git commit: [java-client] repackage to org.apache.kudu (Part 1)
Date Mon, 25 Jul 2016 17:15:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/test/java/org/kududb/client/TestScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/kududb/client/TestScannerMultiTablet.java
deleted file mode 100644
index 251057f..0000000
--- a/java/kudu-client/src/test/java/org/kududb/client/TestScannerMultiTablet.java
+++ /dev/null
@@ -1,236 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.stumbleupon.async.Deferred;
-import org.kududb.ColumnSchema;
-import org.kududb.Schema;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.assertNull;
-import static org.kududb.Type.STRING;
-import static org.junit.Assert.assertEquals;
-
-public class TestScannerMultiTablet extends BaseKuduTest {
-  // Generate a unique table name
-  private static final String TABLE_NAME =
-      TestScannerMultiTablet.class.getName()+"-"+System.currentTimeMillis();
-
-  private static Schema schema = getSchema();
-  private static KuduTable table;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    BaseKuduTest.setUpBeforeClass();
-    // create a 4-tablets table for scanning
-    CreateTableOptions builder =
-        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key1", "key2"));
-
-    for (int i = 1; i < 4; i++){
-      PartialRow splitRow = schema.newPartialRow();
-      splitRow.addString("key1", "" + i);
-      splitRow.addString("key2", "");
-      builder.addSplitRow(splitRow);
-    }
-
-    createTable(TABLE_NAME, schema, builder);
-
-    table = openTable(TABLE_NAME);
-
-    AsyncKuduSession session = client.newSession();
-    session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_SYNC);
-
-    // The data layout ends up like this:
-    // tablet '', '1': no rows
-    // tablet '1', '2': '111', '122', '133'
-    // tablet '2', '3': '211', '222', '233'
-    // tablet '3', '': '311', '322', '333'
-    String[] keys = new String[] {"1", "2", "3"};
-    for (String key1 : keys) {
-      for (String key2 : keys) {
-        Insert insert = table.newInsert();
-        PartialRow row = insert.getRow();
-        row.addString(0, key1);
-        row.addString(1, key2);
-        row.addString(2, key2);
-        Deferred<OperationResponse> d = session.apply(insert);
-        d.join(DEFAULT_SLEEP);
-      }
-    }
-  }
-
-  // Test various combinations of start/end row keys.
-  @Test(timeout = 100000)
-  public void testKeyStartEnd() throws Exception {
-    assertEquals(0,
-        countRowsInScan(getScanner("", "", "1", ""))); // There's nothing in the 1st tablet
-    assertEquals(1, countRowsInScan(getScanner("", "", "1", "2"))); // Grab the very first row
-    assertEquals(3, countRowsInScan(getScanner("1", "1", "1", "4"))); // Grab the whole 2nd tablet
-    assertEquals(3, countRowsInScan(getScanner("1", "1", "2", ""))); // Same, and peek at the 3rd
-    assertEquals(3, countRowsInScan(getScanner("1", "1", "2", "0"))); // Same, different peek
-    assertEquals(4,
-        countRowsInScan(getScanner("1", "2", "2", "3"))); // Middle of 2nd to middle of 3rd
-    assertEquals(3,
-        countRowsInScan(getScanner("1", "4", "2", "4"))); // Peek at the 2nd then whole 3rd
-    assertEquals(6, countRowsInScan(getScanner("1", "5", "3", "4"))); // Whole 3rd and 4th
-    assertEquals(9, countRowsInScan(getScanner("", "", "4", ""))); // Full table scan
-
-    assertEquals(9,
-        countRowsInScan(getScanner("", "", null, null))); // Full table scan with empty upper
-    assertEquals(9,
-        countRowsInScan(getScanner(null, null, "4", ""))); // Full table scan with empty lower
-    assertEquals(9,
-        countRowsInScan(getScanner(null, null, null, null))); // Full table scan with empty bounds
-
-    // Test that we can close a scanner while in between two tablets. We start on the second
-    // tablet and our first nextRows() will get 3 rows. At that moment we want to close the scanner
-    // before getting on the 3rd tablet.
-    AsyncKuduScanner scanner = getScanner("1", "", null, null);
-    Deferred<RowResultIterator> d = scanner.nextRows();
-    RowResultIterator rri = d.join(DEFAULT_SLEEP);
-    assertEquals(3, rri.getNumRows());
-    d = scanner.close();
-    rri = d.join(DEFAULT_SLEEP);
-    assertNull(rri);
-  }
-
-  // Test mixing start/end row keys with predicates.
-  @Test(timeout = 100000)
-  public void testKeysAndPredicates() throws Exception {
-    // First row from the 2nd tablet.
-    ColumnRangePredicate predicate = new ColumnRangePredicate(schema.getColumnByIndex(2));
-    predicate.setLowerBound("1");
-    predicate.setUpperBound("1");
-    assertEquals(1, countRowsInScan(getScanner("1", "", "2", "", predicate)));
-
-    // All the 2nd tablet.
-    predicate = new ColumnRangePredicate(schema.getColumnByIndex(2));
-    predicate.setLowerBound("1");
-    predicate.setUpperBound("3");
-    assertEquals(3, countRowsInScan(getScanner("1", "", "2", "", predicate)));
-
-    // Value that doesn't exist.
-    predicate = new ColumnRangePredicate(schema.getColumnByIndex(2));
-    predicate.setLowerBound("4");
-    assertEquals(0, countRowsInScan(getScanner("1", "", "2", "", predicate)));
-
-    // First row from every tablet.
-    predicate = new ColumnRangePredicate(schema.getColumnByIndex(2));
-    predicate.setLowerBound("1");
-    predicate.setUpperBound("1");
-    assertEquals(3, countRowsInScan(getScanner(null, null, null, null, predicate)));
-
-    // All the rows.
-    predicate = new ColumnRangePredicate(schema.getColumnByIndex(2));
-    predicate.setLowerBound("1");
-    assertEquals(9, countRowsInScan(getScanner(null, null, null, null, predicate)));
-  }
-
-  @Test(timeout = 100000)
-  public void testProjections() throws Exception {
-    // Test with column names.
-    AsyncKuduScanner.AsyncKuduScannerBuilder builder = client.newScannerBuilder(table);
-    builder.setProjectedColumnNames(Lists.newArrayList(schema.getColumnByIndex(0).getName(),
-        schema.getColumnByIndex(1).getName()));
-    buildScannerAndCheckColumnsCount(builder, 2);
-
-    // Test with column indexes.
-    builder = client.newScannerBuilder(table);
-    builder.setProjectedColumnIndexes(Lists.newArrayList(0, 1));
-    buildScannerAndCheckColumnsCount(builder, 2);
-
-    // Test with column names overriding indexes.
-    builder = client.newScannerBuilder(table);
-    builder.setProjectedColumnIndexes(Lists.newArrayList(0, 1));
-    builder.setProjectedColumnNames(Lists.newArrayList(schema.getColumnByIndex(0).getName()));
-    buildScannerAndCheckColumnsCount(builder, 1);
-
-    // Test with keys last with indexes.
-    builder = client.newScannerBuilder(table);
-    builder.setProjectedColumnIndexes(Lists.newArrayList(2, 1, 0));
-    buildScannerAndCheckColumnsCount(builder, 3);
-
-    // Test with keys last with column names.
-    builder = client.newScannerBuilder(table);
-    builder.setProjectedColumnNames(Lists.newArrayList(schema.getColumnByIndex(2).getName(),
-        schema.getColumnByIndex(0).getName()));
-    buildScannerAndCheckColumnsCount(builder, 2);
-  }
-
-  private AsyncKuduScanner getScanner(String lowerBoundKeyOne,
-                                      String lowerBoundKeyTwo,
-                                      String exclusiveUpperBoundKeyOne,
-                                      String exclusiveUpperBoundKeyTwo) {
-    return getScanner(lowerBoundKeyOne, lowerBoundKeyTwo,
-        exclusiveUpperBoundKeyOne, exclusiveUpperBoundKeyTwo, null);
-  }
-
-  private AsyncKuduScanner getScanner(String lowerBoundKeyOne,
-                                      String lowerBoundKeyTwo,
-                                      String exclusiveUpperBoundKeyOne,
-                                      String exclusiveUpperBoundKeyTwo,
-                                      ColumnRangePredicate predicate) {
-    AsyncKuduScanner.AsyncKuduScannerBuilder builder = client.newScannerBuilder(table);
-
-    if (lowerBoundKeyOne != null) {
-      PartialRow lowerBoundRow = schema.newPartialRow();
-      lowerBoundRow.addString(0, lowerBoundKeyOne);
-      lowerBoundRow.addString(1, lowerBoundKeyTwo);
-      builder.lowerBound(lowerBoundRow);
-    }
-
-    if (exclusiveUpperBoundKeyOne != null) {
-      PartialRow upperBoundRow = schema.newPartialRow();
-      upperBoundRow.addString(0, exclusiveUpperBoundKeyOne);
-      upperBoundRow.addString(1, exclusiveUpperBoundKeyTwo);
-      builder.exclusiveUpperBound(upperBoundRow);
-    }
-
-    if (predicate != null) {
-      builder.addColumnRangePredicate(predicate);
-    }
-
-    return builder.build();
-  }
-
-  private void buildScannerAndCheckColumnsCount(AsyncKuduScanner.AsyncKuduScannerBuilder builder,
-                                                int count) throws Exception {
-    AsyncKuduScanner scanner = builder.build();
-    scanner.nextRows().join(DEFAULT_SLEEP);
-    RowResultIterator rri = scanner.nextRows().join(DEFAULT_SLEEP);
-    assertEquals(count, rri.next().getSchema().getColumns().size());
-  }
-
-  private static Schema getSchema() {
-    ArrayList<ColumnSchema> columns = new ArrayList<>(3);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("key1", STRING)
-        .key(true)
-        .build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("key2", STRING)
-        .key(true)
-        .build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("val", STRING)
-        .nullable(true) // Important because we need to make sure it gets passed in projections
-        .build());
-    return new Schema(columns);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/test/java/org/kududb/client/TestStatistics.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestStatistics.java b/java/kudu-client/src/test/java/org/kududb/client/TestStatistics.java
deleted file mode 100644
index 6cfbee7..0000000
--- a/java/kudu-client/src/test/java/org/kududb/client/TestStatistics.java
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.kududb.client.Statistics.Statistic;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestStatistics extends BaseKuduTest {
-
-  private static final String TABLE_NAME = TestStatistics.class.getName() + "-"
-      + System.currentTimeMillis();
-  private static KuduTable table;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    BaseKuduTest.setUpBeforeClass();
-    CreateTableOptions options = getBasicCreateTableOptions().setNumReplicas(1);
-    table = createTable(TABLE_NAME, basicSchema, options);
-  }
-
-  @Test(timeout = 10000)
-  public void test() throws Exception {
-    KuduSession session = syncClient.newSession();
-    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
-    int rowCount = 20;
-    for (int i = 0; i < rowCount; i++) {
-      Insert insert = createBasicSchemaInsert(table, i);
-      session.apply(insert);
-      if (i % 2 == 1) {
-        session.flush();
-      }
-    }
-    Statistics statistics = syncClient.getStatistics();
-    assertEquals(rowCount / 2, statistics.getClientStatistic(Statistic.WRITE_RPCS));
-    assertEquals(rowCount, statistics.getClientStatistic(Statistic.WRITE_OPS));
-    assertEquals(0, statistics.getClientStatistic(Statistic.RPC_ERRORS));
-    assertEquals(0, statistics.getClientStatistic(Statistic.OPS_ERRORS));
-
-    // Use default flush mode.
-    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
-    // Insert duplicate rows, expect to get ALREADY_PRESENT error.
-    long byteSize = 0;
-    for (int i = 0; i < rowCount; i++) {
-      Insert insert = createBasicSchemaInsert(table, i);
-      session.apply(insert);
-      byteSize += insert.getRowOperationSizeBytes();
-    }
-    assertEquals(rowCount + rowCount / 2, statistics.getClientStatistic(Statistic.WRITE_RPCS));
-    assertEquals(rowCount, statistics.getClientStatistic(Statistic.WRITE_OPS));
-    assertEquals(0, statistics.getClientStatistic(Statistic.RPC_ERRORS));
-    assertEquals(rowCount, statistics.getClientStatistic(Statistic.OPS_ERRORS));
-    assertEquals(byteSize * 2, statistics.getClientStatistic(Statistic.BYTES_WRITTEN));
-
-    assertEquals(1, statistics.getTableSet().size());
-    assertEquals(1, statistics.getTabletSet().size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/test/java/org/kududb/client/TestStatus.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestStatus.java b/java/kudu-client/src/test/java/org/kududb/client/TestStatus.java
deleted file mode 100644
index 11bd23b..0000000
--- a/java/kudu-client/src/test/java/org/kududb/client/TestStatus.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import org.junit.Test;
-import org.kududb.client.Status;
-
-import static org.junit.Assert.*;
-
-public class TestStatus {
-
-  @Test
-  public void testOKStatus() {
-    Status s = Status.OK();
-    assertTrue(s.ok());
-    assertFalse(s.isNotAuthorized());
-    assertEquals(-1, s.getPosixCode());
-    assertEquals("OK", s.toString());
-  }
-
-  @Test
-  public void testStatusNonPosix() {
-    Status s = Status.Aborted("foo");
-    assertFalse(s.ok());
-    assertTrue(s.isAborted());
-    assertEquals("ABORTED", s.getCodeName());
-    assertEquals("foo", s.getMessage());
-    assertEquals(-1, s.getPosixCode());
-    assertEquals("Aborted: foo", s.toString());
-  }
-
-  @Test
-  public void testPosixCode() {
-    Status s = Status.NotFound("File not found", 2);
-    assertFalse(s.ok());
-    assertFalse(s.isAborted());
-    assertTrue(s.isNotFound());
-    assertEquals(2, s.getPosixCode());
-    assertEquals("Not found: File not found (error 2)", s.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/test/java/org/kududb/client/TestTestUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestTestUtils.java b/java/kudu-client/src/test/java/org/kududb/client/TestTestUtils.java
deleted file mode 100644
index b150f8e..0000000
--- a/java/kudu-client/src/test/java/org/kududb/client/TestTestUtils.java
+++ /dev/null
@@ -1,114 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for non-trivial helper methods in TestUtils.
- */
-public class TestTestUtils {
-
-  public static final Logger LOG = LoggerFactory.getLogger(TestUtils.class);
-
-  private Process proc;
-
-  @After
-  public void tearDown() {
-    if (proc != null) {
-      proc.destroy();
-    }
-  }
-
-  /**
-   * Starts a process that executes the "yes" command (which prints 'y' in a loop),
-   * sends a SIGSTOP to the process, and ensures that SIGSTOP does indeed pause the process.
-   * Afterwards, sends a SIGCONT to the process and ensures that the process resumes.
-   */
-  @Test(timeout = 2000)
-  public void testPauseAndResume() throws Exception {
-    ProcessBuilder processBuilder = new ProcessBuilder("yes");
-    proc = processBuilder.start();
-    LineCounterRunnable lineCounter = new LineCounterRunnable(proc.getInputStream());
-    Thread thread = new Thread(lineCounter);
-    thread.setDaemon(true);
-    thread.start();
-    TestUtils.pauseProcess(proc);
-    long prevCount;
-    do {
-      prevCount = lineCounter.getCount();
-      Thread.sleep(10);
-    } while (prevCount != lineCounter.getCount());
-    assertEquals(prevCount, lineCounter.getCount());
-    TestUtils.resumeProcess(proc);
-    do {
-      prevCount = lineCounter.getCount();
-      Thread.sleep(10);
-    } while (prevCount == lineCounter.getCount());
-    assertTrue(lineCounter.getCount() > prevCount);
-  }
-
-  /**
-   * Counts the number of lines in a specified input stream.
-   */
-  static class LineCounterRunnable implements Runnable {
-    private final AtomicLong counter;
-    private final InputStream is;
-
-    public LineCounterRunnable(InputStream is) {
-      this.is = is;
-      counter = new AtomicLong(0);
-    }
-
-    @Override
-    public void run() {
-      BufferedReader in = null;
-      try {
-        in = new BufferedReader(new InputStreamReader(is));
-        while (in.readLine() != null) {
-          counter.incrementAndGet();
-        }
-      } catch (Exception e) {
-        LOG.error("Error while reading from the process", e);
-      } finally {
-        if (in != null) {
-          try {
-            in.close();
-          } catch (IOException e) {
-            LOG.error("Error closing the stream", e);
-          }
-        }
-      }
-    }
-
-    public long getCount() {
-      return counter.get();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/test/java/org/kududb/client/TestTimeouts.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestTimeouts.java b/java/kudu-client/src/test/java/org/kududb/client/TestTimeouts.java
deleted file mode 100644
index 3e78918..0000000
--- a/java/kudu-client/src/test/java/org/kududb/client/TestTimeouts.java
+++ /dev/null
@@ -1,68 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.stumbleupon.async.TimeoutException;
-import org.junit.Test;
-
-public class TestTimeouts extends BaseKuduTest {
-
-  private static final String TABLE_NAME =
-      TestTimeouts.class.getName() + "-" + System.currentTimeMillis();
-
-  /**
-   * This test case tries different methods that should all timeout, while relying on the client to
-   * pass down the timeouts to the session and scanner.
-   */
-  @Test(timeout = 100000)
-  public void testLowTimeouts() throws Exception {
-    KuduClient lowTimeoutsClient = new KuduClient.KuduClientBuilder(masterAddresses)
-        .defaultAdminOperationTimeoutMs(1)
-        .defaultOperationTimeoutMs(1)
-        .build();
-
-    try {
-      lowTimeoutsClient.listTabletServers();
-      fail("Should have timed out");
-    } catch (KuduException ex) {
-      // Expected.
-    }
-
-    createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
-    KuduTable table = openTable(TABLE_NAME);
-
-    KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
-
-    try {
-      lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
-      fail("Should have timed out");
-    } catch (KuduException ex) {
-      assertTrue(ex.getStatus().isTimedOut());
-    }
-
-    KuduScanner lowTimeoutScanner = lowTimeoutsClient.newScannerBuilder(table).build();
-    try {
-      lowTimeoutScanner.nextRows();
-      fail("Should have timed out");
-    } catch (KuduException ex) {
-      assertTrue(ex.getStatus().isTimedOut());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/test/java/org/kududb/client/TestUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestUtils.java b/java/kudu-client/src/test/java/org/kududb/client/TestUtils.java
deleted file mode 100644
index 392bed9..0000000
--- a/java/kudu-client/src/test/java/org/kududb/client/TestUtils.java
+++ /dev/null
@@ -1,289 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.client;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.sun.security.auth.module.UnixSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.management.VMManagement;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A grouping of methods that help unit testing.
- */
-public class TestUtils {
-  private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class);
-
-  // Used by pidOfProcess()
-  private static String UNIX_PROCESS_CLS_NAME =  "java.lang.UNIXProcess";
-  private static Set<String> VALID_SIGNALS =  ImmutableSet.of("STOP", "CONT", "TERM", "KILL");
-
-  private static final String BIN_DIR_PROP = "binDir";
-
-  /**
-   * @return the path of the flags file to pass to daemon processes
-   * started by the tests
-   */
-  public static String getFlagsPath() {
-    URL u = BaseKuduTest.class.getResource("/flags");
-    if (u == null) {
-      throw new RuntimeException("Unable to find 'flags' file");
-    }
-    if (u.getProtocol() == "file") {
-      return urlToPath(u);
-    }
-    // If the flags are inside a JAR, extract them into our temporary
-    // test directory.
-    try {
-      // Somewhat unintuitively, createTempFile() actually creates the file,
-      // not just the path, so we have to use REPLACE_EXISTING below.
-      Path tmpFile = Files.createTempFile(
-          Paths.get(getBaseDir()), "kudu-flags", ".flags");
-      Files.copy(BaseKuduTest.class.getResourceAsStream("/flags"), tmpFile,
-          StandardCopyOption.REPLACE_EXISTING);
-      return tmpFile.toAbsolutePath().toString();
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to extract flags file into tmp", e);
-    }
-  }
-
-  /**
-   * Return the path portion of a file URL, after decoding the escaped
-   * components. This fixes issues when trying to build within a
-   * working directory with special characters.
-   */
-  private static String urlToPath(URL u) {
-    try {
-      return URLDecoder.decode(u.getPath(), "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private static String findBuildDir() {
-    URL myUrl = BaseKuduTest.class.getProtectionDomain().getCodeSource().getLocation();
-    File myPath = new File(urlToPath(myUrl));
-    while (myPath != null) {
-      if (new File(myPath, ".git").isDirectory()) {
-        return new File(myPath, "build/latest/bin").getAbsolutePath();
-      }
-      myPath = myPath.getParentFile();
-    }
-    LOG.warn("Unable to find build dir! myUrl={}", myUrl);
-    return null;
-  }
-
-  /**
-   * @param binName the binary to look for (eg 'kudu-tserver')
-   * @return the absolute path of that binary
-   * @throws FileNotFoundException if no such binary is found
-   */
-  public static String findBinary(String binName) throws FileNotFoundException {
-    String binDir = System.getProperty(BIN_DIR_PROP);
-    if (binDir != null) {
-      LOG.info("Using binary directory specified by property: {}",
-          binDir);
-    } else {
-      binDir = findBuildDir();
-    }
-
-    File candidate = new File(binDir, binName);
-    if (candidate.canExecute()) {
-      return candidate.getAbsolutePath();
-    }
-    throw new FileNotFoundException("Cannot find binary " + binName +
-        " in binary directory " + binDir);
-  }
-
-  /**
-   * @return the base directory within which we will store server data
-   */
-  public static String getBaseDir() {
-    String s = System.getenv("TEST_TMPDIR");
-    if (s == null) {
-      s = String.format("/tmp/kudutest-%d", new UnixSystem().getUid());
-    }
-    File f = new File(s);
-    f.mkdirs();
-    return f.getAbsolutePath();
-  }
-
-  /**
-   * Finds the next free port, starting with the one passed. Keep in mind the
-   * time-of-check-time-of-use nature of this method, the returned port might become occupied
-   * after it was checked for availability.
-   * @param startPort first port to be probed
-   * @return a currently usable port
-   * @throws IOException IOE is thrown if we can't close a socket we tried to open or if we run
-   * out of ports to try
-   */
-  public static int findFreePort(int startPort) throws IOException {
-    ServerSocket ss;
-    for(int i = startPort; i < 65536; i++) {
-      try {
-        ss = new ServerSocket();
-        SocketAddress address = new InetSocketAddress(getUniqueLocalhost(), i);
-        ss.bind(address);
-      } catch (IOException e) {
-        continue;
-      }
-      ss.close();
-      return i;
-    }
-    throw new IOException("Ran out of ports.");
-  }
-
-  /**
-   * Finds a specified number of parts, starting with one passed. Keep in mind the
-   * time-of-check-time-of-use nature of this method.
-   * @see {@link #findFreePort(int)}
-   * @param startPort First port to be probed.
-   * @param numPorts Number of ports to reserve.
-   * @return A list of currently usable ports.
-   * @throws IOException IOE Is thrown if we can't close a socket we tried to open or if run
-   * out of ports to try.
-   */
-  public static List<Integer> findFreePorts(int startPort, int numPorts) throws IOException {
-    List<Integer> ports = Lists.newArrayListWithCapacity(numPorts);
-    for (int i = 0; i < numPorts; i++) {
-      startPort = findFreePort(startPort);
-      ports.add(startPort++);
-    }
-    return ports;
-  }
-
-  /**
-   * Gets the pid of a specified process. Relies on reflection and only works on
-   * UNIX process, not guaranteed to work on JDKs other than Oracle and OpenJDK.
-   * @param proc The specified process.
-   * @return The process UNIX pid.
-   * @throws IllegalArgumentException If the process is not a UNIXProcess.
-   * @throws Exception If there are other getting the pid via reflection.
-   */
-  static int pidOfProcess(Process proc) throws Exception {
-    Class<?> procCls = proc.getClass();
-    if (!procCls.getName().equals(UNIX_PROCESS_CLS_NAME)) {
-      throw new IllegalArgumentException("stopProcess() expects objects of class " +
-          UNIX_PROCESS_CLS_NAME + ", but " + procCls.getName() + " was passed in instead!");
-    }
-    Field pidField = procCls.getDeclaredField("pid");
-    pidField.setAccessible(true);
-    return (Integer) pidField.get(proc);
-  }
-
-  /**
-   * Send a code specified by its string representation to the specified process.
-   * TODO: Use a JNR/JNR-Posix instead of forking the JVM to exec "kill".
-   * @param proc The specified process.
-   * @param sig The string representation of the process (e.g., STOP for SIGSTOP).
-   * @throws IllegalArgumentException If the signal type is not supported.
-   * @throws IllegalStateException If we are unable to send the specified signal.
-   */
-  static void signalProcess(Process proc, String sig) throws Exception {
-    if (!VALID_SIGNALS.contains(sig)) {
-      throw new IllegalArgumentException(sig + " is not a supported signal, only " +
-              Joiner.on(",").join(VALID_SIGNALS) + " are supported");
-    }
-    int pid = pidOfProcess(proc);
-    int rv = Runtime.getRuntime()
-            .exec(String.format("kill -%s %d", sig, pid))
-            .waitFor();
-    if (rv != 0) {
-      throw new IllegalStateException(String.format("unable to send SIG%s to process %s(pid=%d): " +
-              "expected return code from kill, but got %d instead", sig, proc, pid, rv));
-    }
-  }
-
-  /**
-   * Pause the specified process by sending a SIGSTOP using the kill command.
-   * @param proc The specified process.
-   * @throws Exception If error prevents us from pausing the process.
-   */
-  static void pauseProcess(Process proc) throws Exception {
-    signalProcess(proc, "STOP");
-  }
-
-  /**
-   * Resumes the specified process by sending a SIGCONT using the kill command.
-   * @param proc The specified process.
-   * @throws Exception If error prevents us from resuming the process.
-   */
-  static void resumeProcess(Process proc) throws Exception {
-    signalProcess(proc, "CONT");
-  }
-
-  /**
-   * This is used to generate unique loopback IPs for parallel test running.
-   * @return the local PID of this process
-   */
-  static int getPid() {
-    try {
-      RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
-      java.lang.reflect.Field jvm = runtime.getClass().getDeclaredField("jvm");
-      jvm.setAccessible(true);
-      VMManagement mgmt = (VMManagement)jvm.get(runtime);
-      Method pid_method = mgmt.getClass().getDeclaredMethod("getProcessId");
-      pid_method.setAccessible(true);
-
-      return (Integer)pid_method.invoke(mgmt);
-    } catch (Exception e) {
-      LOG.warn("Cannot get PID", e);
-      return 1;
-    }
-  }
-
-  /**
-   * The generated IP is based on pid, so this requires that the parallel tests
-   * run in separate VMs.
-   *
-   * On OSX, the above trick doesn't work, so we can't run parallel tests on OSX.
-   * Given that, we just return the normal localhost IP.
-   *
-   * @return a unique loopback IP address for this PID. This allows running
-   * tests in parallel, since 127.0.0.0/8 all act as loopbacks on Linux.
-   */
-  static String getUniqueLocalhost() {
-    if ("Mac OS X".equals(System.getProperty("os.name"))) {
-      return "127.0.0.1";
-    }
-
-    int pid = getPid();
-    return "127." + ((pid & 0xff00) >> 8) + "." + (pid & 0xff) + ".1";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/test/java/org/kududb/util/TestAsyncUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/util/TestAsyncUtil.java b/java/kudu-client/src/test/java/org/kududb/util/TestAsyncUtil.java
deleted file mode 100644
index fce7ddc..0000000
--- a/java/kudu-client/src/test/java/org/kududb/util/TestAsyncUtil.java
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.util;
-
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test for {@link AsyncUtil}.
- */
-public class TestAsyncUtil {
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  @Test
-  public void testAddCallbacksDeferring() throws Exception {
-    Deferred<String> d = new Deferred<String>();
-    TestCallback cb = new TestCallback();
-    TestErrback eb = new TestErrback();
-
-    // Test normal callbacks.
-    AsyncUtil.addCallbacksDeferring(d, cb, eb);
-    final String testStr = "hello world";
-    d.callback(testStr);
-    assertEquals(d.join(), "callback: " + testStr);
-
-    d = new Deferred<String>();
-    AsyncUtil.addCallbacksDeferring(d, cb, eb);
-    d.callback(new IllegalArgumentException());
-    assertEquals(d.join(), "illegal arg");
-
-    d = new Deferred<String>();
-    AsyncUtil.addCallbacksDeferring(d, cb, eb);
-    d.callback(new IllegalStateException());
-    exception.expect(IllegalStateException.class);
-    d.join();
-  }
-
-  final class TestCallback implements Callback<Deferred<String>, String> {
-    @Override
-    public Deferred<String> call(String arg) throws Exception {
-      return Deferred.fromResult("callback: " + arg);
-    }
-  }
-
-  final class TestErrback implements Callback<Deferred<String>, Exception> {
-    @Override
-    public Deferred<String> call(Exception arg) {
-      if (arg instanceof IllegalArgumentException) {
-        return Deferred.fromResult("illegal arg");
-      }
-      return Deferred.fromError(arg);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/test/java/org/kududb/util/TestMurmurHash.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/util/TestMurmurHash.java b/java/kudu-client/src/test/java/org/kududb/util/TestMurmurHash.java
deleted file mode 100644
index 051107c..0000000
--- a/java/kudu-client/src/test/java/org/kududb/util/TestMurmurHash.java
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.util;
-
-import com.google.common.primitives.UnsignedLongs;
-import com.sangupta.murmur.Murmur2;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test Murmur2 Hash64 returns the expected values for inputs.
- *
- * These tests are duplicated on the C++ side to ensure that hash computations
- * are stable across both platforms.
- */
-public class TestMurmurHash {
-
-    @Test
-    public void testMurmur2Hash64() throws Exception {
-      long hash;
-
-      hash = Murmur2.hash64("ab".getBytes("UTF-8"), 2, 0);
-      assertEquals(UnsignedLongs.parseUnsignedLong("7115271465109541368"), hash);
-
-      hash = Murmur2.hash64("abcdefg".getBytes("UTF-8"), 7, 0);
-      assertEquals(UnsignedLongs.parseUnsignedLong("2601573339036254301"), hash);
-
-      hash = Murmur2.hash64("quick brown fox".getBytes("UTF-8"), 15, 42);
-      assertEquals(UnsignedLongs.parseUnsignedLong("3575930248840144026"), hash);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-client/src/test/java/org/kududb/util/TestNetUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/util/TestNetUtil.java b/java/kudu-client/src/test/java/org/kududb/util/TestNetUtil.java
deleted file mode 100644
index 5c003ae..0000000
--- a/java/kudu-client/src/test/java/org/kududb/util/TestNetUtil.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.util;
-
-import com.google.common.net.HostAndPort;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test for {@link NetUtil}.
- */
-public class TestNetUtil {
-
-  /**
-   * Tests parsing strings into {@link HostAndPort} objects with and without specifying
-   * the port in the string.
-   */
-  @Test
-  public void testParseString() {
-    String aStringWithPort = "1.2.3.4:1234";
-    HostAndPort hostAndPortForAStringWithPort = NetUtil.parseString(aStringWithPort, 0);
-    assertEquals(hostAndPortForAStringWithPort.getHostText(), "1.2.3.4");
-    assertEquals(hostAndPortForAStringWithPort.getPort(), 1234);
-
-    String aStringWithoutPort = "1.2.3.4";
-    HostAndPort hostAndPortForAStringWithoutPort = NetUtil.parseString(aStringWithoutPort, 12345);
-    assertEquals(hostAndPortForAStringWithoutPort.getHostText(), aStringWithoutPort);
-    assertEquals(hostAndPortForAStringWithoutPort.getPort(), 12345);
-  }
-
-  /**
-   * Tests parsing comma separated list of "host:port" pairs and hosts into a list of
-   * {@link HostAndPort} objects.
-   */
-  @Test
-  public void testParseStrings() {
-    String testAddrs = "1.2.3.4.5,10.0.0.1:5555,127.0.0.1:7777";
-    List<HostAndPort> hostsAndPorts = NetUtil.parseStrings(testAddrs, 3333);
-    assertArrayEquals(hostsAndPorts.toArray(),
-                         new HostAndPort[] { HostAndPort.fromParts("1.2.3.4.5", 3333),
-                           HostAndPort.fromParts("10.0.0.1", 5555),
-                           HostAndPort.fromParts("127.0.0.1", 7777) }
-    );
-  }
-
-  @Test
-  public void testHostsAndPortsToString() {
-    List<HostAndPort> hostsAndPorts = Arrays.asList(
-        HostAndPort.fromParts("127.0.0.1", 1111),
-        HostAndPort.fromParts("1.2.3.4.5", 0)
-    );
-    assertEquals(NetUtil.hostsAndPortsToString(hostsAndPorts), "127.0.0.1:1111,1.2.3.4.5:0");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java
new file mode 100644
index 0000000..7166300
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduEventProducer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kududb.flume.sink;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+
+import java.util.List;
+
+/**
+ * Interface for an event producer which produces Kudu Operations to write
+ * the headers and body of an event in a Kudu table. This is configurable,
+ * so any config params required should be taken through this. The columns
+ * should exist in the table specified in the configuration for the KuduSink.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface KuduEventProducer extends Configurable, ConfigurableComponent {
+  /**
+   * Initialize the event producer.
+   * @param event to be written to Kudu
+   * @param table the KuduTable object used for creating Kudu Operation objects
+   */
+  void initialize(Event event, KuduTable table);
+
+  /**
+   * Get the operations that should be written out to Kudu as a result of this
+   * event. This list is written to Kudu using the Kudu client API.
+   * @return List of {@link org.kududb.client.Operation} which
+   * are written as such to Kudu
+   */
+  List<Operation> getOperations();
+
+  /*
+   * Clean up any state. This will be called when the sink is being stopped.
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
new file mode 100644
index 0000000..080cda2
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kududb.flume.sink;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduSession;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+import org.kududb.client.OperationResponse;
+import org.kududb.client.SessionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * <p>A Flume sink that reads events from a channel and writes them to a Kudu table.
+ *
+ * <p><strong>Flume Kudu Sink configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1>
+ * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
+ * <tr><td>channel</td><td></td><td>Yes</td><td>The name of the Flume channel to read from.</td></tr>
+ * <tr><td>type</td><td></td><td>Yes</td><td>Component name. Must be {@code org.kududb.flume.sink.KuduSink}</td></tr>
+ * <tr><td>masterAddresses</td><td></td><td>Yes</td><td>Comma-separated list of "host:port" pairs of the Kudu master servers. The port is optional.</td></tr>
+ * <tr><td>tableName</td><td></td><td>Yes</td><td>The name of the Kudu table to write to.</td></tr>
+ * <tr><td>batchSize</td><td>100</td><td>No</td><td>The maximum number of events the sink will attempt to take from the channel per transaction.</td></tr>
+ * <tr><td>ignoreDuplicateRows</td><td>true</td><td>No</td><td>Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu.</td></tr>
+ * <tr><td>timeoutMillis</td><td>10000</td><td>No</td><td>Timeout period for Kudu write operations, in milliseconds.</td></tr>
+ * <tr><td>producer</td><td>{@link org.kududb.flume.sink.SimpleKuduEventProducer}</td><td>No</td><td>The fully qualified class name of the {@link KuduEventProducer} the sink should use.</td></tr>
+ * <tr><td>producer.*</td><td></td><td>(Varies by event producer)</td><td>Configuration properties to pass to the event producer implementation.</td></tr>
+ * </table>
+ *
+ * <p><strong>Installation</strong>
+ *
+ * <p>After building the sink, in order to use it with Flume, place the file named
+ * <tt>kudu-flume-sink-<em>VERSION</em>-jar-with-dependencies.jar</tt> in the
+ * Flume <tt>plugins.d</tt> directory under <tt>kudu-flume-sink/lib/</tt>.
+ *
+ * <p>For detailed instructions on using Flume's plugins.d mechanism, please see the plugins.d
+ * section of the <a href="https://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory">Flume User Guide</a>.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduSink extends AbstractSink implements Configurable {
+  private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
+  private static final Long DEFAULT_BATCH_SIZE = 100L;
+  private static final Long DEFAULT_TIMEOUT_MILLIS =
+          AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+  private static final String DEFAULT_KUDU_EVENT_PRODUCER =
+          "org.kududb.flume.sink.SimpleKuduEventProducer";
+  private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
+
+  private String masterAddresses;
+  private String tableName;
+  private long batchSize;
+  private long timeoutMillis;
+  private boolean ignoreDuplicateRows;
+  private KuduTable table;
+  private KuduSession session;
+  private KuduClient client;
+  private KuduEventProducer eventProducer;
+  private String eventProducerType;
+  private Context producerContext;
+  private SinkCounter sinkCounter;
+
+  public KuduSink() {
+    this(null);
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  public KuduSink(KuduClient kuduClient) {
+    this.client = kuduClient;
+  }
+
+  @Override
+  public void start() {
+    Preconditions.checkState(table == null && session == null, "Please call stop " +
+        "before calling start on an old instance.");
+
+    // This is not null only inside tests
+    if (client == null) {
+      client = new KuduClient.KuduClientBuilder(masterAddresses).build();
+    }
+    session = client.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+    session.setTimeoutMillis(timeoutMillis);
+    session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
+
+    try {
+      table = client.openTable(tableName);
+    } catch (Exception e) {
+      sinkCounter.incrementConnectionFailedCount();
+      String msg = String.format("Could not open table '%s' from Kudu", tableName);
+      logger.error(msg, e);
+      throw new FlumeException(msg, e);
+    }
+
+    super.start();
+    sinkCounter.incrementConnectionCreatedCount();
+    sinkCounter.start();
+  }
+
+  @Override
+  public void stop() {
+    try {
+      if (client != null) {
+        client.shutdown();
+      }
+      client = null;
+      table = null;
+      session = null;
+    } catch (Exception e) {
+      throw new FlumeException("Error closing client.", e);
+    }
+    sinkCounter.incrementConnectionClosedCount();
+    sinkCounter.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void configure(Context context) {
+    masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
+    tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
+
+    batchSize = context.getLong(
+            KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE);
+    timeoutMillis = context.getLong(
+            KuduSinkConfigurationConstants.TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
+    ignoreDuplicateRows = context.getBoolean(
+            KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
+    eventProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER);
+
+    Preconditions.checkNotNull(masterAddresses,
+        "Master address cannot be empty, please specify '" +
+                KuduSinkConfigurationConstants.MASTER_ADDRESSES +
+                "' in configuration file");
+    Preconditions.checkNotNull(tableName,
+        "Table name cannot be empty, please specify '" +
+                KuduSinkConfigurationConstants.TABLE_NAME +
+                "' in configuration file");
+
+    // Check for event producer, if null set event producer type.
+    if (eventProducerType == null || eventProducerType.isEmpty()) {
+      eventProducerType = DEFAULT_KUDU_EVENT_PRODUCER;
+      logger.info("No Kudu event producer defined, will use default");
+    }
+
+    producerContext = new Context();
+    producerContext.putAll(context.getSubProperties(
+            KuduSinkConfigurationConstants.PRODUCER_PREFIX));
+
+    try {
+      Class<? extends KuduEventProducer> clazz =
+          (Class<? extends KuduEventProducer>)
+          Class.forName(eventProducerType);
+      eventProducer = clazz.newInstance();
+      eventProducer.configure(producerContext);
+    } catch (Exception e) {
+      logger.error("Could not instantiate Kudu event producer." , e);
+      Throwables.propagate(e);
+    }
+    sinkCounter = new SinkCounter(this.getName());
+  }
+
+  public KuduClient getClient() {
+    return client;
+  }
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    if (session.hasPendingOperations()) {
+      // If for whatever reason we have pending operations then just refuse to process
+      // and tell caller to try again a bit later. We don't want to pile on the kudu
+      // session object.
+      return Status.BACKOFF;
+    }
+
+    Channel channel = getChannel();
+    Transaction txn = channel.getTransaction();
+
+    txn.begin();
+
+    try {
+      long txnEventCount = 0;
+      for (; txnEventCount < batchSize; txnEventCount++) {
+        Event event = channel.take();
+        if (event == null) {
+          break;
+        }
+
+        eventProducer.initialize(event, table);
+        List<Operation> operations = eventProducer.getOperations();
+        for (Operation o : operations) {
+          session.apply(o);
+        }
+      }
+
+      logger.debug("Flushing {} events", txnEventCount);
+      List<OperationResponse> responses = session.flush();
+      if (responses != null) {
+        for (OperationResponse response : responses) {
+          // Throw an EventDeliveryException if at least one of the responses was
+          // a row error. Row errors can occur for example when an event is inserted
+          // into Kudu successfully but the Flume transaction is rolled back for some reason,
+          // and a subsequent replay of the same Flume transaction leads to a
+          // duplicate key error since the row already exists in Kudu.
+          // (Kudu doesn't support "insert or overwrite" semantics yet.)
+          // Note: Duplicate keys will not be reported as errors if ignoreDuplicateRows
+          // is enabled in the config.
+          if (response.hasRowError()) {
+            throw new EventDeliveryException("Failed to flush one or more changes. " +
+                "Transaction rolled back: " + response.getRowError().toString());
+          }
+        }
+      }
+
+      if (txnEventCount == 0) {
+        sinkCounter.incrementBatchEmptyCount();
+      } else if (txnEventCount == batchSize) {
+        sinkCounter.incrementBatchCompleteCount();
+      } else {
+        sinkCounter.incrementBatchUnderflowCount();
+      }
+
+      txn.commit();
+
+      if (txnEventCount == 0) {
+        return Status.BACKOFF;
+      }
+
+      sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+      return Status.READY;
+
+    } catch (Throwable e) {
+      txn.rollback();
+
+      String msg = "Failed to commit transaction. Transaction rolled back.";
+      logger.error(msg, e);
+      if (e instanceof Error || e instanceof RuntimeException) {
+        Throwables.propagate(e);
+      } else {
+        logger.error(msg, e);
+        throw new EventDeliveryException(msg, e);
+      }
+    } finally {
+      txn.close();
+    }
+
+    return Status.BACKOFF;
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  KuduEventProducer getEventProducer() {
+    return eventProducer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
new file mode 100644
index 0000000..6486137
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kududb.flume.sink;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+/**
+ * Constants used for configuration of KuduSink
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduSinkConfigurationConstants {
+  /**
+   * Comma-separated list of "host:port" pairs of the masters (port optional).
+   */
+  public static final String MASTER_ADDRESSES = "masterAddresses";
+
+  /**
+   * The name of the table in Kudu to write to.
+   */
+  public static final String TABLE_NAME = "tableName";
+
+  /**
+   * The fully qualified class name of the Kudu event producer the sink should use.
+   */
+  public static final String PRODUCER = "producer";
+
+  /**
+   * Configuration to pass to the Kudu event producer.
+   */
+  public static final String PRODUCER_PREFIX = PRODUCER + ".";
+
+  /**
+   * Maximum number of events the sink should take from the channel per
+   * transaction, if available.
+   */
+  public static final String BATCH_SIZE = "batchSize";
+
+  /**
+   * Timeout period for Kudu operations, in milliseconds.
+   */
+  public static final String TIMEOUT_MILLIS = "timeoutMillis";
+
+  /**
+   * Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu.
+   */
+  public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
new file mode 100644
index 0000000..b5be054
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduEventProducer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kududb.flume.sink;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.kududb.client.Insert;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+import org.kududb.client.PartialRow;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * <p>A simple serializer that generates one {@link Insert} per {@link Event} by writing the event
+ * body into a BINARY column. The headers are discarded.
+ *
+ * <p><strong>Simple Kudu Event Producer configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1>
+ * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
+ * <tr><td>producer.payloadColumn</td><td>payload</td><td>No</td><td>The name of the BINARY column to write the Flume the event body to.</td></tr>
+ * </table>
+ */
+public class SimpleKuduEventProducer implements KuduEventProducer {
+  private byte[] payload;
+  private KuduTable table;
+  private String payloadColumn;
+
+  public SimpleKuduEventProducer(){
+  }
+
+  @Override
+  public void configure(Context context) {
+    payloadColumn = context.getString("payloadColumn","payload");
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+
+  @Override
+  public void initialize(Event event, KuduTable table) {
+    this.payload = event.getBody();
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations() throws FlumeException {
+    try {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addBinary(payloadColumn, payload);
+
+      return Collections.singletonList((Operation) insert);
+    } catch (Exception e){
+      throw new FlumeException("Failed to create Kudu Insert object!", e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java
deleted file mode 100644
index 7166300..0000000
--- a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.kududb.flume.sink;
-
-import org.apache.flume.Event;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.conf.ConfigurableComponent;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.KuduTable;
-import org.kududb.client.Operation;
-
-import java.util.List;
-
-/**
- * Interface for an event producer which produces Kudu Operations to write
- * the headers and body of an event in a Kudu table. This is configurable,
- * so any config params required should be taken through this. The columns
- * should exist in the table specified in the configuration for the KuduSink.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface KuduEventProducer extends Configurable, ConfigurableComponent {
-  /**
-   * Initialize the event producer.
-   * @param event to be written to Kudu
-   * @param table the KuduTable object used for creating Kudu Operation objects
-   */
-  void initialize(Event event, KuduTable table);
-
-  /**
-   * Get the operations that should be written out to Kudu as a result of this
-   * event. This list is written to Kudu using the Kudu client API.
-   * @return List of {@link org.kududb.client.Operation} which
-   * are written as such to Kudu
-   */
-  List<Operation> getOperations();
-
-  /*
-   * Clean up any state. This will be called when the sink is being stopped.
-   */
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java
deleted file mode 100644
index 080cda2..0000000
--- a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.kududb.flume.sink;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.AbstractSink;
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-import org.kududb.client.AsyncKuduClient;
-import org.kududb.client.KuduClient;
-import org.kududb.client.KuduSession;
-import org.kududb.client.KuduTable;
-import org.kududb.client.Operation;
-import org.kududb.client.OperationResponse;
-import org.kududb.client.SessionConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * <p>A Flume sink that reads events from a channel and writes them to a Kudu table.
- *
- * <p><strong>Flume Kudu Sink configuration parameters</strong>
- *
- * <table cellpadding=3 cellspacing=0 border=1>
- * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
- * <tr><td>channel</td><td></td><td>Yes</td><td>The name of the Flume channel to read from.</td></tr>
- * <tr><td>type</td><td></td><td>Yes</td><td>Component name. Must be {@code org.kududb.flume.sink.KuduSink}</td></tr>
- * <tr><td>masterAddresses</td><td></td><td>Yes</td><td>Comma-separated list of "host:port" pairs of the Kudu master servers. The port is optional.</td></tr>
- * <tr><td>tableName</td><td></td><td>Yes</td><td>The name of the Kudu table to write to.</td></tr>
- * <tr><td>batchSize</td><td>100</td><td>No</td><td>The maximum number of events the sink will attempt to take from the channel per transaction.</td></tr>
- * <tr><td>ignoreDuplicateRows</td><td>true</td><td>No</td><td>Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu.</td></tr>
- * <tr><td>timeoutMillis</td><td>10000</td><td>No</td><td>Timeout period for Kudu write operations, in milliseconds.</td></tr>
- * <tr><td>producer</td><td>{@link org.kududb.flume.sink.SimpleKuduEventProducer}</td><td>No</td><td>The fully qualified class name of the {@link KuduEventProducer} the sink should use.</td></tr>
- * <tr><td>producer.*</td><td></td><td>(Varies by event producer)</td><td>Configuration properties to pass to the event producer implementation.</td></tr>
- * </table>
- *
- * <p><strong>Installation</strong>
- *
- * <p>After building the sink, in order to use it with Flume, place the file named
- * <tt>kudu-flume-sink-<em>VERSION</em>-jar-with-dependencies.jar</tt> in the
- * Flume <tt>plugins.d</tt> directory under <tt>kudu-flume-sink/lib/</tt>.
- *
- * <p>For detailed instructions on using Flume's plugins.d mechanism, please see the plugins.d
- * section of the <a href="https://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory">Flume User Guide</a>.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduSink extends AbstractSink implements Configurable {
-  private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
-  private static final Long DEFAULT_BATCH_SIZE = 100L;
-  private static final Long DEFAULT_TIMEOUT_MILLIS =
-          AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
-  private static final String DEFAULT_KUDU_EVENT_PRODUCER =
-          "org.kududb.flume.sink.SimpleKuduEventProducer";
-  private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
-
-  private String masterAddresses;
-  private String tableName;
-  private long batchSize;
-  private long timeoutMillis;
-  private boolean ignoreDuplicateRows;
-  private KuduTable table;
-  private KuduSession session;
-  private KuduClient client;
-  private KuduEventProducer eventProducer;
-  private String eventProducerType;
-  private Context producerContext;
-  private SinkCounter sinkCounter;
-
-  public KuduSink() {
-    this(null);
-  }
-
-  @VisibleForTesting
-  @InterfaceAudience.Private
-  public KuduSink(KuduClient kuduClient) {
-    this.client = kuduClient;
-  }
-
-  @Override
-  public void start() {
-    Preconditions.checkState(table == null && session == null, "Please call stop " +
-        "before calling start on an old instance.");
-
-    // This is not null only inside tests
-    if (client == null) {
-      client = new KuduClient.KuduClientBuilder(masterAddresses).build();
-    }
-    session = client.newSession();
-    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
-    session.setTimeoutMillis(timeoutMillis);
-    session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
-
-    try {
-      table = client.openTable(tableName);
-    } catch (Exception e) {
-      sinkCounter.incrementConnectionFailedCount();
-      String msg = String.format("Could not open table '%s' from Kudu", tableName);
-      logger.error(msg, e);
-      throw new FlumeException(msg, e);
-    }
-
-    super.start();
-    sinkCounter.incrementConnectionCreatedCount();
-    sinkCounter.start();
-  }
-
-  @Override
-  public void stop() {
-    try {
-      if (client != null) {
-        client.shutdown();
-      }
-      client = null;
-      table = null;
-      session = null;
-    } catch (Exception e) {
-      throw new FlumeException("Error closing client.", e);
-    }
-    sinkCounter.incrementConnectionClosedCount();
-    sinkCounter.stop();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void configure(Context context) {
-    masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
-    tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
-
-    batchSize = context.getLong(
-            KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE);
-    timeoutMillis = context.getLong(
-            KuduSinkConfigurationConstants.TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
-    ignoreDuplicateRows = context.getBoolean(
-            KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
-    eventProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER);
-
-    Preconditions.checkNotNull(masterAddresses,
-        "Master address cannot be empty, please specify '" +
-                KuduSinkConfigurationConstants.MASTER_ADDRESSES +
-                "' in configuration file");
-    Preconditions.checkNotNull(tableName,
-        "Table name cannot be empty, please specify '" +
-                KuduSinkConfigurationConstants.TABLE_NAME +
-                "' in configuration file");
-
-    // Check for event producer, if null set event producer type.
-    if (eventProducerType == null || eventProducerType.isEmpty()) {
-      eventProducerType = DEFAULT_KUDU_EVENT_PRODUCER;
-      logger.info("No Kudu event producer defined, will use default");
-    }
-
-    producerContext = new Context();
-    producerContext.putAll(context.getSubProperties(
-            KuduSinkConfigurationConstants.PRODUCER_PREFIX));
-
-    try {
-      Class<? extends KuduEventProducer> clazz =
-          (Class<? extends KuduEventProducer>)
-          Class.forName(eventProducerType);
-      eventProducer = clazz.newInstance();
-      eventProducer.configure(producerContext);
-    } catch (Exception e) {
-      logger.error("Could not instantiate Kudu event producer." , e);
-      Throwables.propagate(e);
-    }
-    sinkCounter = new SinkCounter(this.getName());
-  }
-
-  public KuduClient getClient() {
-    return client;
-  }
-
-  @Override
-  public Status process() throws EventDeliveryException {
-    if (session.hasPendingOperations()) {
-      // If for whatever reason we have pending operations then just refuse to process
-      // and tell caller to try again a bit later. We don't want to pile on the kudu
-      // session object.
-      return Status.BACKOFF;
-    }
-
-    Channel channel = getChannel();
-    Transaction txn = channel.getTransaction();
-
-    txn.begin();
-
-    try {
-      long txnEventCount = 0;
-      for (; txnEventCount < batchSize; txnEventCount++) {
-        Event event = channel.take();
-        if (event == null) {
-          break;
-        }
-
-        eventProducer.initialize(event, table);
-        List<Operation> operations = eventProducer.getOperations();
-        for (Operation o : operations) {
-          session.apply(o);
-        }
-      }
-
-      logger.debug("Flushing {} events", txnEventCount);
-      List<OperationResponse> responses = session.flush();
-      if (responses != null) {
-        for (OperationResponse response : responses) {
-          // Throw an EventDeliveryException if at least one of the responses was
-          // a row error. Row errors can occur for example when an event is inserted
-          // into Kudu successfully but the Flume transaction is rolled back for some reason,
-          // and a subsequent replay of the same Flume transaction leads to a
-          // duplicate key error since the row already exists in Kudu.
-          // (Kudu doesn't support "insert or overwrite" semantics yet.)
-          // Note: Duplicate keys will not be reported as errors if ignoreDuplicateRows
-          // is enabled in the config.
-          if (response.hasRowError()) {
-            throw new EventDeliveryException("Failed to flush one or more changes. " +
-                "Transaction rolled back: " + response.getRowError().toString());
-          }
-        }
-      }
-
-      if (txnEventCount == 0) {
-        sinkCounter.incrementBatchEmptyCount();
-      } else if (txnEventCount == batchSize) {
-        sinkCounter.incrementBatchCompleteCount();
-      } else {
-        sinkCounter.incrementBatchUnderflowCount();
-      }
-
-      txn.commit();
-
-      if (txnEventCount == 0) {
-        return Status.BACKOFF;
-      }
-
-      sinkCounter.addToEventDrainSuccessCount(txnEventCount);
-      return Status.READY;
-
-    } catch (Throwable e) {
-      txn.rollback();
-
-      String msg = "Failed to commit transaction. Transaction rolled back.";
-      logger.error(msg, e);
-      if (e instanceof Error || e instanceof RuntimeException) {
-        Throwables.propagate(e);
-      } else {
-        logger.error(msg, e);
-        throw new EventDeliveryException(msg, e);
-      }
-    } finally {
-      txn.close();
-    }
-
-    return Status.BACKOFF;
-  }
-
-  @VisibleForTesting
-  @InterfaceAudience.Private
-  KuduEventProducer getEventProducer() {
-    return eventProducer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java
deleted file mode 100644
index 6486137..0000000
--- a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.kududb.flume.sink;
-
-import org.kududb.annotations.InterfaceAudience;
-import org.kududb.annotations.InterfaceStability;
-
-/**
- * Constants used for configuration of KuduSink
- */
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduSinkConfigurationConstants {
-  /**
-   * Comma-separated list of "host:port" pairs of the masters (port optional).
-   */
-  public static final String MASTER_ADDRESSES = "masterAddresses";
-
-  /**
-   * The name of the table in Kudu to write to.
-   */
-  public static final String TABLE_NAME = "tableName";
-
-  /**
-   * The fully qualified class name of the Kudu event producer the sink should use.
-   */
-  public static final String PRODUCER = "producer";
-
-  /**
-   * Configuration to pass to the Kudu event producer.
-   */
-  public static final String PRODUCER_PREFIX = PRODUCER + ".";
-
-  /**
-   * Maximum number of events the sink should take from the channel per
-   * transaction, if available.
-   */
-  public static final String BATCH_SIZE = "batchSize";
-
-  /**
-   * Timeout period for Kudu operations, in milliseconds.
-   */
-  public static final String TIMEOUT_MILLIS = "timeoutMillis";
-
-  /**
-   * Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu.
-   */
-  public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java
deleted file mode 100644
index b5be054..0000000
--- a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.kududb.flume.sink;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.flume.conf.ComponentConfiguration;
-import org.kududb.client.Insert;
-import org.kududb.client.KuduTable;
-import org.kududb.client.Operation;
-import org.kududb.client.PartialRow;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * <p>A simple serializer that generates one {@link Insert} per {@link Event} by writing the event
- * body into a BINARY column. The headers are discarded.
- *
- * <p><strong>Simple Kudu Event Producer configuration parameters</strong>
- *
- * <table cellpadding=3 cellspacing=0 border=1>
- * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
- * <tr><td>producer.payloadColumn</td><td>payload</td><td>No</td><td>The name of the BINARY column to write the Flume the event body to.</td></tr>
- * </table>
- */
-public class SimpleKuduEventProducer implements KuduEventProducer {
-  private byte[] payload;
-  private KuduTable table;
-  private String payloadColumn;
-
-  public SimpleKuduEventProducer(){
-  }
-
-  @Override
-  public void configure(Context context) {
-    payloadColumn = context.getString("payloadColumn","payload");
-  }
-
-  @Override
-  public void configure(ComponentConfiguration conf) {
-  }
-
-  @Override
-  public void initialize(Event event, KuduTable table) {
-    this.payload = event.getBody();
-    this.table = table;
-  }
-
-  @Override
-  public List<Operation> getOperations() throws FlumeException {
-    try {
-      Insert insert = table.newInsert();
-      PartialRow row = insert.getRow();
-      row.addBinary(payloadColumn, payload);
-
-      return Collections.singletonList((Operation) insert);
-    } catch (Exception e){
-      throw new FlumeException("Failed to create Kudu Insert object!", e);
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-}


Mime
View raw message