flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1757. Improve configuration of hbase serializers.
Date Tue, 21 May 2013 03:15:00 GMT
Updated Branches:
  refs/heads/flume-1.4 309e4cf3c -> 079039004


FLUME-1757. Improve configuration of hbase serializers.

(Sravya Tirukkovalur via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/07903900
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/07903900
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/07903900

Branch: refs/heads/flume-1.4
Commit: 079039004b4dc37cfb90ac407557a4853c3a447a
Parents: 309e4cf
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Mon May 20 20:12:38 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Mon May 20 20:14:03 2013 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    2 +-
 .../hbase/SimpleAsyncHbaseEventSerializer.java     |    4 +-
 .../sink/hbase/SimpleHbaseEventSerializer.java     |    4 +-
 .../flume/sink/hbase/TestAsyncHBaseSink.java       |   42 ++++++++++++++-
 .../org/apache/flume/sink/hbase/TestHBaseSink.java |   41 +++++++++++++-
 5 files changed, 84 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/07903900/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 2ee41be..1b4d216 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1697,7 +1697,7 @@ Property Name       Default                                        
        Desc
 **table**           --                                                      The name of the
table in Hbase to write to.
 **columnFamily**    --                                                      The column family
in Hbase to write to.
 batchSize           100                                                     Number of events
to be written per txn.
-serializer          org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
+serializer          org.apache.flume.sink.hbase.SimpleHbaseEventSerializer  Default increment
column = "iCol", payload column = "pCol".
 serializer.*        --                                                      Properties to
be passed to the serializer.
 kerberosPrincipal   --                                                      Kerberos user
principal for accessing secure HBase
 kerberosKeytab      --                                                      Kerberos keytab
for accessing secure HBase

http://git-wip-us.apache.org/repos/asf/flume/blob/07903900/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
index dd19616..96095d1 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java
@@ -115,8 +115,8 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize
 
   @Override
   public void configure(Context context) {
-    String pCol = context.getString("payloadColumn");
-    String iCol = context.getString("incrementColumn");
+    String pCol = context.getString("payloadColumn", "pCol");
+    String iCol = context.getString("incrementColumn", "iCol");
     rowPrefix = context.getString("rowPrefix", "default");
     String suffix = context.getString("suffix", "uuid");
     if(pCol != null && !pCol.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/flume/blob/07903900/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
index 52bc84d..758252b 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java
@@ -70,8 +70,8 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer
{
         context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
     String suffix = context.getString("suffix", "uuid");
 
-    String payloadColumn = context.getString("payloadColumn");
-    String incColumn = context.getString("incrementColumn");
+    String payloadColumn = context.getString("payloadColumn","pCol");
+    String incColumn = context.getString("incrementColumn","iCol");
     if(payloadColumn != null && !payloadColumn.isEmpty()) {
       if(suffix.equals("timestamp")){
         keyType = KeyType.TS;

http://git-wip-us.apache.org/repos/asf/flume/blob/07903900/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index 03c3e4c..7ddfdae 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -69,8 +69,8 @@ public class TestAsyncHBaseSink {
 
   private static String tableName = "TestHbaseSink";
   private static String columnFamily = "TestColumnFamily";
-  private static String inColumn = "Increment";
-  private static String plCol = "pc";
+  private static String inColumn = "iCol";
+  private static String plCol = "pCol";
   private static Context ctx = new Context();
   private static String valBase = "testing hbase sink: jham";
   private boolean deleteTable = true;
@@ -164,6 +164,44 @@ public class TestAsyncHBaseSink {
   }
 
   @Test
+  public void testOneEventWithDefaults() throws Exception {
+    Map<String,String> ctxMap = new HashMap<String,String>();
+    ctxMap.put("table", tableName);
+    ctxMap.put("columnFamily", columnFamily);
+    ctxMap.put("serializer",
+            "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer");
+    ctxMap.put("keep-alive", "0");
+    ctxMap.put("timeout", "10000");
+    Context tmpctx = new Context();
+    tmpctx.putAll(ctxMap);
+
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    deleteTable = true;
+    AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
+    Configurables.configure(sink, tmpctx);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, tmpctx);
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = EventBuilder.withBody(
+            Bytes.toBytes(valBase));
+    channel.put(e);
+    tx.commit();
+    tx.close();
+    Assert.assertFalse(sink.isConfNull());
+    sink.process();
+    sink.stop();
+    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    byte[][] results = getResults(table, 1);
+    byte[] out = results[0];
+    Assert.assertArrayEquals(e.getBody(), out);
+    out = results[1];
+    Assert.assertArrayEquals(Longs.toByteArray(1), out);
+  }
+
+  @Test
   public void testOneEvent() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     deleteTable = true;

http://git-wip-us.apache.org/repos/asf/flume/blob/07903900/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index ad94fc9..ab4128e 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -55,8 +55,8 @@ public class TestHBaseSink {
   private static HBaseTestingUtility testUtility = new HBaseTestingUtility();
   private static String tableName = "TestHbaseSink";
   private static String columnFamily = "TestColumnFamily";
-  private static String inColumn = "Increment";
-  private static String plCol = "pc";
+  private static String inColumn = "iCol";
+  private static String plCol = "pCol";
   private static Context ctx = new Context();
   private static String valBase = "testing hbase sink: jham";
 
@@ -80,6 +80,43 @@ public class TestHBaseSink {
   }
 
   @Test
+  public void testOneEventWithDefaults() throws Exception {
+    //Create a context without setting increment column and payload Column
+    Map<String,String> ctxMap = new HashMap<String,String>();
+    ctxMap.put("table", tableName);
+    ctxMap.put("columnFamily", columnFamily);
+    ctxMap.put("serializer",
+            "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer");
+    Context tmpctx = new Context();
+    tmpctx.putAll(ctxMap);
+
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    Configurables.configure(sink, tmpctx);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = EventBuilder.withBody(
+            Bytes.toBytes(valBase));
+    channel.put(e);
+    tx.commit();
+    tx.close();
+
+    sink.process();
+    sink.stop();
+    HTable table = new HTable(testUtility.getConfiguration(), tableName);
+    byte[][] results = getResults(table, 1);
+    byte[] out = results[0];
+    Assert.assertArrayEquals(e.getBody(), out);
+    out = results[1];
+    Assert.assertArrayEquals(Longs.toByteArray(1), out);
+    testUtility.deleteTable(tableName.getBytes());
+  }
+
+  @Test
   public void testOneEvent() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink(testUtility.getConfiguration());


Mime
View raw message