flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szabofe...@apache.org
Subject [1/2] flume git commit: FLUME-3142: Adding HBase2 sink
Date Fri, 08 Jun 2018 08:12:03 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 3ada170dd -> 719afe908


http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
new file mode 100644
index 0000000..0f482fc
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
@@ -0,0 +1,780 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase2;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import static org.mockito.Mockito.*;
+
+public class TestHBase2Sink {
+  private static final Logger logger =
+          LoggerFactory.getLogger(TestHBase2Sink.class);
+
+  private static final String tableName = "TestHbaseSink";
+  private static final String columnFamily = "TestColumnFamily";
+  private static final String inColumn = "iCol";
+  private static final String plCol = "pCol";
+  private static final String valBase = "testing hbase sink: jham";
+
+  private static HBaseTestingUtility testUtility;
+
+  private Configuration conf;
+
+  @BeforeClass
+  public static void setUpOnce() throws Exception {
+    String hbaseVer = org.apache.hadoop.hbase.util.VersionInfo.getVersion();
+    System.out.println("HBASE VERSION:" + hbaseVer);
+
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean("hbase.localcluster.assign.random.ports", true);
+    testUtility = new HBaseTestingUtility(conf);
+    testUtility.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws Exception {
+    testUtility.shutdownMiniCluster();
+  }
+
+  /**
+   * Most common context setup for unit tests using
+   * {@link SimpleHBase2EventSerializer}.
+   */
+  @Before
+  public void setUp() throws IOException {
+    conf = new Configuration(testUtility.getConfiguration());
+    testUtility.createTable(TableName.valueOf(tableName), columnFamily.getBytes());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    testUtility.deleteTable(TableName.valueOf(tableName));
+  }
+
+  /**
+   * Set up {@link Context} for use with {@link SimpleHBase2EventSerializer}.
+   */
+  private Context getContextForSimpleHBase2EventSerializer() {
+    Context ctx = new Context();
+    ctx.put("table", tableName);
+    ctx.put("columnFamily", columnFamily);
+    ctx.put("serializer", SimpleHBase2EventSerializer.class.getName());
+    ctx.put("serializer.payloadColumn", plCol);
+    ctx.put("serializer.incrementColumn", inColumn);
+    return ctx;
+  }
+
+  /**
+   * Set up {@link Context} for use with {@link IncrementHBase2Serializer}.
+   */
+  private Context getContextForIncrementHBaseSerializer() {
+    Context ctx = new Context();
+    ctx.put("table", tableName);
+    ctx.put("columnFamily", columnFamily);
+    ctx.put("serializer", IncrementHBase2Serializer.class.getName());
+    return ctx;
+  }
+
+  /**
+   * Set up {@link Context} for use with {@link IncrementHBase2Serializer}.
+   */
+  private Context getContextWithoutIncrementHBaseSerializer() {
+    //Create a context without setting increment column and payload Column
+    Context ctx = new Context();
+    ctx.put("table", tableName);
+    ctx.put("columnFamily", columnFamily);
+    ctx.put("serializer", SimpleHBase2EventSerializer.class.getName());
+    return ctx;
+  }
+
+  @Test
+  public void testOneEventWithDefaults() throws Exception {
+    Context ctx = getContextWithoutIncrementHBaseSerializer();
+    HBase2Sink sink = new HBase2Sink(conf);
+    Configurables.configure(sink, ctx);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = EventBuilder.withBody(Bytes.toBytes(valBase));
+    channel.put(e);
+    tx.commit();
+    tx.close();
+
+    sink.process();
+    sink.stop();
+
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+         Table table = connection.getTable(TableName.valueOf(tableName))) {
+      byte[][] results = getResults(table, 1);
+      byte[] out = results[0];
+      Assert.assertArrayEquals(e.getBody(), out);
+      out = results[1];
+      Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    }
+  }
+
+  @Test
+  public void testOneEvent() throws Exception {
+    Context ctx = getContextForSimpleHBase2EventSerializer();
+    HBase2Sink sink = new HBase2Sink(conf);
+    Configurables.configure(sink, ctx);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = EventBuilder.withBody(
+        Bytes.toBytes(valBase));
+    channel.put(e);
+    tx.commit();
+    tx.close();
+
+    sink.process();
+    sink.stop();
+
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+         Table table = connection.getTable(TableName.valueOf(tableName))) {
+      byte[][] results = getResults(table, 1);
+      byte[] out = results[0];
+      Assert.assertArrayEquals(e.getBody(), out);
+      out = results[1];
+      Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    }
+  }
+
+  @Test
+  public void testThreeEvents() throws Exception {
+    Context ctx = getContextForSimpleHBase2EventSerializer();
+    ctx.put("batchSize", "3");
+    HBase2Sink sink = new HBase2Sink(conf);
+    Configurables.configure(sink, ctx);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < 3; i++) {
+      Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+    sink.process();
+    sink.stop();
+
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+        Table table = connection.getTable(TableName.valueOf(tableName))) {
+      byte[][] results = getResults(table, 3);
+      byte[] out;
+      int found = 0;
+      for (int i = 0; i < 3; i++) {
+        for (int j = 0; j < 3; j++) {
+          if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
+            found++;
+            break;
+          }
+        }
+      }
+      Assert.assertEquals(3, found);
+      out = results[3];
+      Assert.assertArrayEquals(Longs.toByteArray(3), out);
+    }
+  }
+
+  @Test
+  public void testMultipleBatches() throws Exception {
+    Context ctx = getContextForSimpleHBase2EventSerializer();
+    ctx.put("batchSize", "2");
+    HBase2Sink sink = new HBase2Sink(conf);
+    Configurables.configure(sink, ctx);
+    //Reset the context to a higher batchSize
+    ctx.put("batchSize", "100");
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < 3; i++) {
+      Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+    int count = 0;
+    while (sink.process() != Status.BACKOFF) {
+      count++;
+    }
+    sink.stop();
+    Assert.assertEquals(2, count);
+
+    try (Connection connection = ConnectionFactory.createConnection(conf)) {
+      Table table = connection.getTable(TableName.valueOf(tableName));
+      byte[][] results = getResults(table, 3);
+      byte[] out;
+      int found = 0;
+      for (int i = 0; i < 3; i++) {
+        for (int j = 0; j < 3; j++) {
+          if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
+            found++;
+            break;
+          }
+        }
+      }
+      Assert.assertEquals(3, found);
+      out = results[3];
+      Assert.assertArrayEquals(Longs.toByteArray(3), out);
+    }
+  }
+
+  @Test(expected = FlumeException.class)
+  public void testMissingTable() throws Exception {
+    logger.info("Running testMissingTable()");
+    Context ctx = getContextForSimpleHBase2EventSerializer();
+
+    // setUp() will create the table, so we delete it.
+    logger.info("Deleting table {}", tableName);
+    testUtility.deleteTable(TableName.valueOf(tableName));
+
+    ctx.put("batchSize", "2");
+    HBase2Sink sink = new HBase2Sink(conf);
+    Configurables.configure(sink, ctx);
+    //Reset the context to a higher batchSize
+    ctx.put("batchSize", "100");
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+
+    logger.info("Writing data into channel");
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < 3; i++) {
+      Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+
+    logger.info("Starting sink and processing events");
+    try {
+      logger.info("Calling sink.start()");
+      sink.start(); // This method will throw.
+
+      // We never get here, but we log in case the behavior changes.
+      logger.error("Unexpected error: Calling sink.process()");
+      sink.process();
+      logger.error("Unexpected error: Calling sink.stop()");
+      sink.stop();
+    } finally {
+      // Re-create the table so tearDown() doesn't throw.
+      testUtility.createTable(TableName.valueOf(tableName), columnFamily.getBytes());
+    }
+
+    // FIXME: The test should never get here, the below code doesn't run.
+    Assert.fail();
+  }
+
+  // TODO: Move this test to a different class and run it stand-alone.
+
+  /**
+   * This test must run last - it shuts down the minicluster :D
+   *
+   * @throws Exception
+   */
+  @Ignore("For dev builds only:" +
+          "This test takes too long, and this has to be run after all other" +
+          "tests, since it shuts down the minicluster. " +
+          "Comment out all other tests" +
+          "and uncomment this annotation to run this test.")
+  @Test(expected = EventDeliveryException.class)
+  public void testHBaseFailure() throws Exception {
+    Context ctx = getContextForSimpleHBase2EventSerializer();
+    ctx.put("batchSize", "2");
+    HBase2Sink sink = new HBase2Sink(conf);
+    Configurables.configure(sink, ctx);
+    //Reset the context to a higher batchSize
+    ctx.put("batchSize", "100");
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < 3; i++) {
+      Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+    sink.process();
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+         Table table = connection.getTable(TableName.valueOf(tableName))) {
+      byte[][] results = getResults(table, 2);
+      byte[] out;
+      int found = 0;
+      for (int i = 0; i < 2; i++) {
+        for (int j = 0; j < 2; j++) {
+          if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
+            found++;
+            break;
+          }
+        }
+      }
+      Assert.assertEquals(2, found);
+      out = results[2];
+      Assert.assertArrayEquals(Longs.toByteArray(2), out);
+    }
+    testUtility.shutdownMiniCluster();
+    sink.process();
+    sink.stop();
+  }
+
+  /**
+   * Makes HBase scans to get rows in the payload column and increment column
+   * in the table given. Expensive, so tread lightly.
+   * Calling this function multiple times for the same result set is a bad
+   * idea. Cache the result set once it is returned by this function.
+   *
+   * @param table
+   * @param numEvents Number of events inserted into the table
+   * @return
+   * @throws IOException
+   */
+  private byte[][] getResults(Table table, int numEvents) throws IOException {
+    byte[][] results = new byte[numEvents + 1][];
+    Scan scan = new Scan();
+    scan.addColumn(columnFamily.getBytes(),plCol.getBytes());
+    scan.withStartRow(Bytes.toBytes("default"));
+    ResultScanner rs = table.getScanner(scan);
+    byte[] out;
+    int i = 0;
+    try {
+      for (Result r = rs.next(); r != null; r = rs.next()) {
+        out = r.getValue(columnFamily.getBytes(), plCol.getBytes());
+
+        if (i >= results.length - 1) {
+          rs.close();
+          throw new FlumeException("More results than expected in the table." +
+                                   "Expected = " + numEvents + ". Found = " + i);
+        }
+        results[i++] = out;
+        System.out.println(out);
+      }
+    } finally {
+      rs.close();
+    }
+
+    Assert.assertEquals(i, results.length - 1);
+    scan = new Scan();
+    scan.addColumn(columnFamily.getBytes(),inColumn.getBytes());
+    scan.withStartRow(Bytes.toBytes("incRow"));
+    rs = table.getScanner(scan);
+    try {
+      for (Result r = rs.next(); r != null; r = rs.next()) {
+        out = r.getValue(columnFamily.getBytes(), inColumn.getBytes());
+        results[i++] = out;
+        System.out.println(out);
+      }
+    } finally {
+      rs.close();
+    }
+    return results;
+  }
+
+  @Test
+  public void testTransactionStateOnChannelException() throws Exception {
+    Context ctx = getContextForSimpleHBase2EventSerializer();
+    ctx.put("batchSize", "1");
+
+    HBase2Sink sink = new HBase2Sink(conf);
+    Configurables.configure(sink, ctx);
+    // Reset the context to a higher batchSize
+    Channel channel = spy(new MemoryChannel());
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + 0));
+    channel.put(e);
+    tx.commit();
+    tx.close();
+    doThrow(new ChannelException("Mock Exception")).when(channel).take();
+    try {
+      sink.process();
+      Assert.fail("take() method should throw exception");
+    } catch (ChannelException ex) {
+      Assert.assertEquals("Mock Exception", ex.getMessage());
+    }
+    doReturn(e).when(channel).take();
+    sink.process();
+    sink.stop();
+
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+         Table table = connection.getTable(TableName.valueOf(tableName))) {
+      byte[][] results = getResults(table, 1);
+      byte[] out = results[0];
+      Assert.assertArrayEquals(e.getBody(), out);
+      out = results[1];
+      Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    }
+  }
+
+  @Test
+  public void testTransactionStateOnSerializationException() throws Exception {
+    Context ctx = getContextForSimpleHBase2EventSerializer();
+    ctx.put("batchSize", "1");
+    ctx.put(HBase2SinkConfigurationConstants.CONFIG_SERIALIZER,
+            "org.apache.flume.sink.hbase2.MockSimpleHBase2EventSerializer");
+
+    HBase2Sink sink = new HBase2Sink(conf);
+    Configurables.configure(sink, ctx);
+    // Reset the context to a higher batchSize
+    ctx.put("batchSize", "100");
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + 0));
+    channel.put(e);
+    tx.commit();
+    tx.close();
+    try {
+      MockSimpleHBase2EventSerializer.throwException = true;
+      sink.process();
+      Assert.fail("FlumeException expected from serializer");
+    } catch (FlumeException ex) {
+      Assert.assertEquals("Exception for testing", ex.getMessage());
+    }
+    MockSimpleHBase2EventSerializer.throwException = false;
+    sink.process();
+    sink.stop();
+
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+         Table table = connection.getTable(TableName.valueOf(tableName))) {
+      byte[][] results = getResults(table, 1);
+      byte[] out = results[0];
+      Assert.assertArrayEquals(e.getBody(), out);
+      out = results[1];
+      Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    }
+  }
+
+  @Test
+  public void testWithoutConfigurationObject() throws Exception {
+    Context ctx = getContextForSimpleHBase2EventSerializer();
+    Context tmpContext = new Context(ctx.getParameters());
+    tmpContext.put("batchSize", "2");
+    tmpContext.put(HBase2SinkConfigurationConstants.ZK_QUORUM,
+                   ZKConfig.getZKQuorumServersString(conf));
+    System.out.print(ctx.getString(HBase2SinkConfigurationConstants.ZK_QUORUM));
+    tmpContext.put(HBase2SinkConfigurationConstants.ZK_ZNODE_PARENT,
+                   conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
+
+    HBase2Sink sink = new HBase2Sink();
+    Configurables.configure(sink, tmpContext);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, ctx);
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < 3; i++) {
+      Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+    Status status = Status.READY;
+    while (status != Status.BACKOFF) {
+      status = sink.process();
+    }
+    sink.stop();
+
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+         Table table = connection.getTable(TableName.valueOf(tableName))) {
+      byte[][] results = getResults(table, 3);
+      byte[] out;
+      int found = 0;
+      for (int i = 0; i < 3; i++) {
+        for (int j = 0; j < 3; j++) {
+          if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
+            found++;
+            break;
+          }
+        }
+      }
+      Assert.assertEquals(3, found);
+      out = results[3];
+      Assert.assertArrayEquals(Longs.toByteArray(3), out);
+    }
+  }
+
+  @Test
+  public void testZKQuorum() throws Exception {
+    Context ctx = getContextForSimpleHBase2EventSerializer();
+    Context tmpContext = new Context(ctx.getParameters());
+    String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " +
+                      "zk3.flume.apache.org:3342";
+    tmpContext.put("batchSize", "2");
+    tmpContext.put(HBase2SinkConfigurationConstants.ZK_QUORUM, zkQuorum);
+    tmpContext.put(HBase2SinkConfigurationConstants.ZK_ZNODE_PARENT,
+                   conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
+    HBase2Sink sink = new HBase2Sink();
+    Configurables.configure(sink, tmpContext);
+    Assert.assertEquals("zk1.flume.apache.org,zk2.flume.apache.org," +
+                        "zk3.flume.apache.org", sink.getConfig().get(HConstants.ZOOKEEPER_QUORUM));
+    Assert.assertEquals(String.valueOf(3342),
+                        sink.getConfig().get(HConstants.ZOOKEEPER_CLIENT_PORT));
+  }
+
+  @Test(expected = FlumeException.class)
+  public void testZKQuorumIncorrectPorts() throws Exception {
+    Context ctx = getContextForSimpleHBase2EventSerializer();
+    Context tmpContext = new Context(ctx.getParameters());
+
+    String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " +
+                      "zk3.flume.apache.org:3342";
+    tmpContext.put("batchSize", "2");
+    tmpContext.put(HBase2SinkConfigurationConstants.ZK_QUORUM, zkQuorum);
+    tmpContext.put(HBase2SinkConfigurationConstants.ZK_ZNODE_PARENT,
+                   conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
+    HBase2Sink sink = new HBase2Sink();
+    Configurables.configure(sink, tmpContext);
+    Assert.fail();
+  }
+
+  @Test
+  public void testCoalesce() throws EventDeliveryException {
+    Context ctx = getContextForIncrementHBaseSerializer();
+    ctx.put("batchSize", "100");
+    ctx.put(HBase2SinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+        String.valueOf(true));
+
+    final Map<String, Long> expectedCounts = Maps.newHashMap();
+    expectedCounts.put("r1:c1", 10L);
+    expectedCounts.put("r1:c2", 20L);
+    expectedCounts.put("r2:c1", 7L);
+    expectedCounts.put("r2:c3", 63L);
+    HBase2Sink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts);
+
+    HBase2Sink sink = new HBase2Sink(testUtility.getConfiguration(), cb);
+    Configurables.configure(sink, ctx);
+    Channel channel = createAndConfigureMemoryChannel(sink);
+
+    List<Event> events = Lists.newLinkedList();
+    generateEvents(events, expectedCounts);
+    putEvents(channel, events);
+
+    sink.start();
+    sink.process(); // Calls CoalesceValidator instance.
+    sink.stop();
+  }
+
+  @Test(expected = AssertionError.class)
+  public void negativeTestCoalesce() throws EventDeliveryException {
+    Context ctx = getContextForIncrementHBaseSerializer();
+    ctx.put("batchSize", "10");
+
+    final Map<String, Long> expectedCounts = Maps.newHashMap();
+    expectedCounts.put("r1:c1", 10L);
+    HBase2Sink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts);
+
+    HBase2Sink sink = new HBase2Sink(testUtility.getConfiguration(), cb);
+    Configurables.configure(sink, ctx);
+    Channel channel = createAndConfigureMemoryChannel(sink);
+
+    List<Event> events = Lists.newLinkedList();
+    generateEvents(events, expectedCounts);
+    putEvents(channel, events);
+
+    sink.start();
+    sink.process(); // Calls CoalesceValidator instance.
+    sink.stop();
+  }
+
+  @Test
+  public void testBatchAware() throws EventDeliveryException {
+    logger.info("Running testBatchAware()");
+    Context ctx = getContextForIncrementHBaseSerializer();
+    HBase2Sink sink = new HBase2Sink(testUtility.getConfiguration());
+    Configurables.configure(sink, ctx);
+    Channel channel = createAndConfigureMemoryChannel(sink);
+
+    sink.start();
+    int batchCount = 3;
+    for (int i = 0; i < batchCount; i++) {
+      sink.process();
+    }
+    sink.stop();
+    Assert.assertEquals(batchCount,
+        ((IncrementHBase2Serializer) sink.getSerializer()).getNumBatchesStarted());
+  }
+
+  @Test (expected = ConfigurationException.class)
+  public void testHBaseVersionCheck() throws Exception {
+    Context ctx = getContextWithoutIncrementHBaseSerializer();
+    HBase2Sink sink = mock(HBase2Sink.class);
+    doCallRealMethod().when(sink).configure(any());
+    when(sink.getHBbaseVersionString()).thenReturn("1.0.0");
+    Configurables.configure(sink, ctx);
+  }
+
+  @Test (expected = ConfigurationException.class)
+  public void testHBaseVersionCheckNotANumber() throws Exception {
+    Context ctx = getContextWithoutIncrementHBaseSerializer();
+    HBase2Sink sink = mock(HBase2Sink.class);
+    doCallRealMethod().when(sink).configure(any());
+    when(sink.getHBbaseVersionString()).thenReturn("Dummy text");
+    Configurables.configure(sink, ctx);
+  }
+
+  /**
+   * For testing that the rows coalesced, serialized by
+   * {@link IncrementHBase2Serializer}, are of the expected batch size.
+   */
+  private static class CoalesceValidator
+      implements HBase2Sink.DebugIncrementsCallback {
+
+    private final Map<String,Long> expectedCounts;
+
+    public CoalesceValidator(Map<String, Long> expectedCounts) {
+      this.expectedCounts = expectedCounts;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void onAfterCoalesce(Iterable<Increment> increments) {
+      for (Increment inc : increments) {
+        byte[] row = inc.getRow();
+        Map<byte[], NavigableMap<byte[], Long>> families = null;
+        try {
+          families = inc.getFamilyMapOfLongs();
+        } catch (Exception e) {
+          Throwables.propagate(e);
+        }
+        assert families != null;
+        for (byte[] family : families.keySet()) {
+          NavigableMap<byte[], Long> qualifiers = families.get(family);
+          for (Map.Entry<byte[], Long> entry : qualifiers.entrySet()) {
+            byte[] qualifier = entry.getKey();
+            Long count = entry.getValue();
+            String key = new String(row, Charsets.UTF_8) +
+                    ':' +
+                    new String(qualifier, Charsets.UTF_8);
+            Assert.assertEquals("Expected counts don't match observed for " + key,
+                expectedCounts.get(key), count);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Add number of Events corresponding to counts to the events list.
+   * @param events Destination list.
+   * @param counts How many events to generate for each row:qualifier pair.
+   */
+  private void generateEvents(List<Event> events, Map<String, Long> counts) {
+    for (String key : counts.keySet()) {
+      long count = counts.get(key);
+      for (long i = 0; i < count; i++) {
+        events.add(EventBuilder.withBody(key, Charsets.UTF_8));
+      }
+    }
+  }
+
+  private Channel createAndConfigureMemoryChannel(HBase2Sink sink) {
+    Channel channel = new MemoryChannel();
+    Context channelCtx = new Context();
+    channelCtx.put("capacity", String.valueOf(1000L));
+    channelCtx.put("transactionCapacity", String.valueOf(1000L));
+    Configurables.configure(channel, channelCtx);
+    sink.setChannel(channel);
+    channel.start();
+    return channel;
+  }
+
+  private void putEvents(Channel channel, Iterable<Event> events) {
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (Event event : events) {
+      channel.put(event);
+    }
+    tx.commit();
+    tx.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2SinkCreation.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2SinkCreation.java
b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2SinkCreation.java
new file mode 100644
index 0000000..02127ba
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2SinkCreation.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase2;
+
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.sink.DefaultSinkFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHBase2SinkCreation {
+
+  private SinkFactory sinkFactory;
+
+  @Before
+  public void setUp() {
+    sinkFactory = new DefaultSinkFactory();
+  }
+
+  private void verifySinkCreation(Class<?> typeClass) throws FlumeException {
+    Sink sink = sinkFactory.create("hbase2-sink", "hbase2");
+    Assert.assertNotNull(sink);
+    Assert.assertTrue(typeClass.isInstance(sink));
+  }
+
+  @Test
+  public void testSinkCreation() {
+    verifySinkCreation(HBase2Sink.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestRegexHBase2EventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestRegexHBase2EventSerializer.java
b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestRegexHBase2EventSerializer.java
new file mode 100644
index 0000000..ee9f739
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestRegexHBase2EventSerializer.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase2;
+
+import com.google.common.collect.Maps;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestRegexHBase2EventSerializer {
+
+  @Test
+  /* Ensure that when no config is specified, the a catch-all regex is used
+   *  with default column name. */
+  public void testDefaultBehavior() throws Exception {
+    RegexHBase2EventSerializer s = new RegexHBase2EventSerializer();
+    Context context = new Context();
+    s.configure(context);
+    String logMsg = "The sky is falling!";
+    Event e = EventBuilder.withBody(Bytes.toBytes(logMsg));
+    s.initialize(e, "CF".getBytes());
+    List<Row> actions = s.getActions();
+    assertTrue(actions.size() == 1);
+    assertTrue(actions.get(0) instanceof Put);
+    Put put = (Put) actions.get(0);
+
+    assertTrue(put.getFamilyCellMap().containsKey(s.cf));
+    List<Cell> cells = put.getFamilyCellMap().get(s.cf);
+    assertTrue(cells.size() == 1);
+
+    Map<String, String> resultMap = Maps.newHashMap();
+    for (Cell cell : cells) {
+      resultMap.put(new String(CellUtil.cloneQualifier(cell)),
+          new String(CellUtil.cloneValue(cell)));
+    }
+
+    assertTrue(resultMap.containsKey(
+        RegexHBase2EventSerializer.COLUMN_NAME_DEFAULT));
+    assertEquals("The sky is falling!",
+        resultMap.get(RegexHBase2EventSerializer.COLUMN_NAME_DEFAULT));
+  }
+  @Test
+  public void testRowIndexKey() throws Exception {
+    RegexHBase2EventSerializer s = new RegexHBase2EventSerializer();
+    Context context = new Context();
+    context.put(RegexHBase2EventSerializer.REGEX_CONFIG,"^([^\t]+)\t([^\t]+)\t" + "([^\t]+)$");
+    context.put(RegexHBase2EventSerializer.COL_NAME_CONFIG, "col1,col2,ROW_KEY");
+    context.put("rowKeyIndex", "2");
+    s.configure(context);
+
+    String body = "val1\tval2\trow1";
+    Event e = EventBuilder.withBody(Bytes.toBytes(body));
+    s.initialize(e, "CF".getBytes());
+    List<Row> actions = s.getActions();
+
+    Put put = (Put)actions.get(0);
+
+    List<Cell> cells = put.getFamilyCellMap().get(s.cf);
+    assertTrue(cells.size() == 2);
+
+    Map<String, String> resultMap = Maps.newHashMap();
+    for (Cell cell : cells) {
+      resultMap.put(new String(CellUtil.cloneQualifier(cell)),
+          new String(CellUtil.cloneValue(cell)));
+    }
+    assertEquals("val1", resultMap.get("col1"));
+    assertEquals("val2", resultMap.get("col2"));
+    assertEquals("row1", Bytes.toString(put.getRow()));
+  }
+
+  @Test
+  /* Test a common case where regex is used to parse apache log format. */
+  public void testApacheRegex() throws Exception {
+    RegexHBase2EventSerializer s = new RegexHBase2EventSerializer();
+    Context context = new Context();
+    context.put(RegexHBase2EventSerializer.REGEX_CONFIG,
+        "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) \"([^ ]+) ([^ ]+)" +
+        " ([^\"]+)\" (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\")" +
+        " ([^ \"]*|\"[^\"]*\"))?");
+    context.put(RegexHBase2EventSerializer.COL_NAME_CONFIG,
+        "host,identity,user,time,method,request,protocol,status,size," +
+        "referer,agent");
+    s.configure(context);
+    String logMsg = "33.22.11.00 - - [20/May/2011:07:01:19 +0000] " +
+        "\"GET /wp-admin/css/install.css HTTP/1.0\" 200 813 " +
+        "\"http://www.cloudera.com/wp-admin/install.php\" \"Mozilla/5.0 (comp" +
+        "atible; Yahoo! Slurp; http://help.yahoo.com/help/us/ysearch/slurp)\"";
+
+    Event e = EventBuilder.withBody(Bytes.toBytes(logMsg));
+    s.initialize(e, "CF".getBytes());
+    List<Row> actions = s.getActions();
+    assertEquals(1, s.getActions().size());
+    assertTrue(actions.get(0) instanceof Put);
+
+    Put put = (Put) actions.get(0);
+    assertTrue(put.getFamilyCellMap().containsKey(s.cf));
+    List<Cell> cells = put.getFamilyCellMap().get(s.cf);
+    assertTrue(cells.size() == 11);
+
+    Map<String, String> resultMap = Maps.newHashMap();
+    for (Cell cell : cells) {
+      resultMap.put(new String(CellUtil.cloneQualifier(cell)),
+          new String(CellUtil.cloneValue(cell)));
+    }
+
+    assertEquals("33.22.11.00", resultMap.get("host"));
+    assertEquals("-", resultMap.get("identity"));
+    assertEquals("-", resultMap.get("user"));
+    assertEquals("[20/May/2011:07:01:19 +0000]", resultMap.get("time"));
+    assertEquals("GET", resultMap.get("method"));
+    assertEquals("/wp-admin/css/install.css", resultMap.get("request"));
+    assertEquals("HTTP/1.0", resultMap.get("protocol"));
+    assertEquals("200", resultMap.get("status"));
+    assertEquals("813", resultMap.get("size"));
+    assertEquals("\"http://www.cloudera.com/wp-admin/install.php\"",
+        resultMap.get("referer"));
+    assertEquals("\"Mozilla/5.0 (compatible; Yahoo! Slurp; " +
+        "http://help.yahoo.com/help/us/ysearch/slurp)\"",
+        resultMap.get("agent"));
+
+    List<Increment> increments = s.getIncrements();
+    assertEquals(0, increments.size());
+  }
+
+  @Test
+  public void testRowKeyGeneration() {
+    Context context = new Context();
+    RegexHBase2EventSerializer s1 = new RegexHBase2EventSerializer();
+    s1.configure(context);
+    RegexHBase2EventSerializer s2 = new RegexHBase2EventSerializer();
+    s2.configure(context);
+
+    // Reset shared nonce to zero
+    RegexHBase2EventSerializer.nonce.set(0);
+    String randomString = RegexHBase2EventSerializer.randomKey;
+
+    Event e1 = EventBuilder.withBody(Bytes.toBytes("body"));
+    Event e2 = EventBuilder.withBody(Bytes.toBytes("body"));
+    Event e3 = EventBuilder.withBody(Bytes.toBytes("body"));
+
+    Calendar cal = mock(Calendar.class);
+    when(cal.getTimeInMillis()).thenReturn(1L);
+
+    s1.initialize(e1, "CF".getBytes());
+    String rk1 = new String(s1.getRowKey(cal));
+    assertEquals("1-" + randomString + "-0", rk1);
+
+    when(cal.getTimeInMillis()).thenReturn(10L);
+    s1.initialize(e2, "CF".getBytes());
+    String rk2 = new String(s1.getRowKey(cal));
+    assertEquals("10-" + randomString + "-1", rk2);
+
+    when(cal.getTimeInMillis()).thenReturn(100L);
+    s2.initialize(e3, "CF".getBytes());
+    String rk3 = new String(s2.getRowKey(cal));
+    assertEquals("100-" + randomString + "-2", rk3);
+
+  }
+
+  @Test
+  /* Test depositing of the header information. */
+  public void testDepositHeaders() throws Exception {
+    Charset charset = Charset.forName("KOI8-R");
+    RegexHBase2EventSerializer s = new RegexHBase2EventSerializer();
+    Context context = new Context();
+    context.put(RegexHBase2EventSerializer.DEPOSIT_HEADERS_CONFIG,
+        "true");
+    context.put(RegexHBase2EventSerializer.CHARSET_CONFIG,
+               charset.toString());
+    s.configure(context);
+
+    String body = "body";
+    Map<String, String> headers = Maps.newHashMap();
+    headers.put("header1", "value1");
+    headers.put("заголовок2", "значение2");
+
+    Event e = EventBuilder.withBody(Bytes.toBytes(body), headers);
+    s.initialize(e, "CF".getBytes());
+    List<Row> actions = s.getActions();
+    assertEquals(1, s.getActions().size());
+    assertTrue(actions.get(0) instanceof Put);
+
+    Put put = (Put) actions.get(0);
+    assertTrue(put.getFamilyCellMap().containsKey(s.cf));
+    List<Cell> cells = put.getFamilyCellMap().get(s.cf);
+    assertTrue(cells.size() == 3);
+
+    Map<String, byte[]> resultMap = Maps.newHashMap();
+    for (Cell cell : cells) {
+      resultMap.put(new String(CellUtil.cloneQualifier(cell), charset),
+              CellUtil.cloneValue(cell));
+    }
+
+    assertEquals(body,
+                 new String(resultMap.get(RegexHBase2EventSerializer.COLUMN_NAME_DEFAULT),
+                 charset));
+    assertEquals("value1", new String(resultMap.get("header1"), charset));
+    assertArrayEquals("значение2".getBytes(charset), resultMap.get("заголовок2"));
+    assertEquals("значение2".length(), resultMap.get("заголовок2").length);
+
+    List<Increment> increments = s.getIncrements();
+    assertEquals(0, increments.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index 7740ad7..cbb6db1 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -44,6 +44,7 @@ limitations under the License.
     <module>flume-hdfs-sink</module>
     <module>flume-irc-sink</module>
     <module>flume-ng-hbase-sink</module>
+    <module>flume-ng-hbase2-sink</module>
     <module>flume-ng-elasticsearch-sink</module>
     <module>flume-ng-morphline-solr-sink</module>
     <module>flume-ng-kafka-sink</module>

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0f656a5..597ea43 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,8 @@ limitations under the License.
     <guava.version>18.0</guava.version>
     <guava-old.version>11.0.2</guava-old.version>
     <hadoop2.version>2.9.0</hadoop2.version>
+    <hbase2.jetty.version>9.3.19.v20170502</hbase2.jetty.version>
+    <hbase2.version>2.0.0</hbase2.version>
     <httpcore.version>4.4.6</httpcore.version>
     <httpclient.version>4.5.3</httpclient.version>
     <irclib.version>1.10</irclib.version>
@@ -1483,6 +1485,12 @@ limitations under the License.
 
       <dependency>
         <groupId>org.apache.flume.flume-ng-sinks</groupId>
+        <artifactId>flume-ng-hbase2-sink</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.flume.flume-ng-sinks</groupId>
         <artifactId>flume-ng-elasticsearch-sink</artifactId>
         <version>${project.version}</version>
       </dependency>


Mime
View raw message