Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 874D1DECC for ; Tue, 21 May 2013 03:14:47 +0000 (UTC) Received: (qmail 21427 invoked by uid 500); 21 May 2013 03:14:47 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 21215 invoked by uid 500); 21 May 2013 03:14:40 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 21166 invoked by uid 99); 21 May 2013 03:14:39 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 May 2013 03:14:39 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3ACD8890D68; Tue, 21 May 2013 03:14:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org Message-Id: <8367ceeec548488c96ff78e91955d62c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1757. Improve configuration of hbase serializers. Date: Tue, 21 May 2013 03:14:39 +0000 (UTC) Updated Branches: refs/heads/trunk a13e9e6a8 -> 669e5d327 FLUME-1757. Improve configuration of hbase serializers. (Sravya Tirukkovalur via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/669e5d32 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/669e5d32 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/669e5d32 Branch: refs/heads/trunk Commit: 669e5d327720021391245949d6a947b4e963b728 Parents: a13e9e6 Author: Hari Shreedharan Authored: Mon May 20 20:12:38 2013 -0700 Committer: Hari Shreedharan Committed: Mon May 20 20:12:38 2013 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 +- .../hbase/SimpleAsyncHbaseEventSerializer.java | 4 +- .../sink/hbase/SimpleHbaseEventSerializer.java | 4 +- .../flume/sink/hbase/TestAsyncHBaseSink.java | 42 ++++++++++++++- .../org/apache/flume/sink/hbase/TestHBaseSink.java | 41 +++++++++++++- 5 files changed, 84 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 2ee41be..1b4d216 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1697,7 +1697,7 @@ Property Name Default Desc **table** -- The name of the table in Hbase to write to. **columnFamily** -- The column family in Hbase to write to. batchSize 100 Number of events to be written per txn. -serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer +serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = "iCol", payload column = "pCol". serializer.* -- Properties to be passed to the serializer. kerberosPrincipal -- Kerberos user principal for accessing secure HBase kerberosKeytab -- Kerberos keytab for accessing secure HBase http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java index dd19616..96095d1 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java @@ -115,8 +115,8 @@ public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerialize @Override public void configure(Context context) { - String pCol = context.getString("payloadColumn"); - String iCol = context.getString("incrementColumn"); + String pCol = context.getString("payloadColumn", "pCol"); + String iCol = context.getString("incrementColumn", "iCol"); rowPrefix = context.getString("rowPrefix", "default"); String suffix = context.getString("suffix", "uuid"); if(pCol != null && !pCol.isEmpty()) { http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java index 52bc84d..758252b 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java @@ -70,8 +70,8 @@ public class SimpleHbaseEventSerializer implements HbaseEventSerializer { context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); String suffix = context.getString("suffix", "uuid"); - String payloadColumn = context.getString("payloadColumn"); - String incColumn = context.getString("incrementColumn"); + String payloadColumn = context.getString("payloadColumn","pCol"); + String incColumn = context.getString("incrementColumn","iCol"); if(payloadColumn != null && !payloadColumn.isEmpty()) { if(suffix.equals("timestamp")){ keyType = KeyType.TS; http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java index 03c3e4c..7ddfdae 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java @@ -69,8 +69,8 @@ public class TestAsyncHBaseSink { private static String tableName = "TestHbaseSink"; private static String columnFamily = "TestColumnFamily"; - private static String inColumn = "Increment"; - private static String plCol = "pc"; + private static String inColumn = "iCol"; + private static String plCol = "pCol"; private static Context ctx = new Context(); private static String valBase = "testing hbase sink: jham"; private boolean deleteTable = true; @@ -164,6 +164,44 @@ public class TestAsyncHBaseSink { } @Test + public void testOneEventWithDefaults() throws Exception { + Map ctxMap = new HashMap(); + ctxMap.put("table", tableName); + ctxMap.put("columnFamily", columnFamily); + ctxMap.put("serializer", + "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer"); + ctxMap.put("keep-alive", "0"); + ctxMap.put("timeout", "10000"); + Context tmpctx = new Context(); + tmpctx.putAll(ctxMap); + + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + deleteTable = true; + AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); + Configurables.configure(sink, tmpctx); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, tmpctx); + sink.setChannel(channel); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = EventBuilder.withBody( + Bytes.toBytes(valBase)); + channel.put(e); + tx.commit(); + tx.close(); + Assert.assertFalse(sink.isConfNull()); + sink.process(); + sink.stop(); + HTable table = new HTable(testUtility.getConfiguration(), tableName); + byte[][] results = getResults(table, 1); + byte[] out = results[0]; + Assert.assertArrayEquals(e.getBody(), out); + out = results[1]; + Assert.assertArrayEquals(Longs.toByteArray(1), out); + } + + @Test public void testOneEvent() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); deleteTable = true; http://git-wip-us.apache.org/repos/asf/flume/blob/669e5d32/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java index ad94fc9..ab4128e 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java @@ -55,8 +55,8 @@ public class TestHBaseSink { private static HBaseTestingUtility testUtility = new HBaseTestingUtility(); private static String tableName = "TestHbaseSink"; private static String columnFamily = "TestColumnFamily"; - private static String inColumn = "Increment"; - private static String plCol = "pc"; + private static String inColumn = "iCol"; + private static String plCol = "pCol"; private static Context ctx = new Context(); private static String valBase = "testing hbase sink: jham"; @@ -80,6 +80,43 @@ public class TestHBaseSink { } @Test + public void testOneEventWithDefaults() throws Exception { + //Create a context without setting increment column and payload Column + Map ctxMap = new HashMap(); + ctxMap.put("table", tableName); + ctxMap.put("columnFamily", columnFamily); + ctxMap.put("serializer", + "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer"); + Context tmpctx = new Context(); + tmpctx.putAll(ctxMap); + + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + Configurables.configure(sink, tmpctx); + Channel channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + sink.setChannel(channel); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = EventBuilder.withBody( + Bytes.toBytes(valBase)); + channel.put(e); + tx.commit(); + tx.close(); + + sink.process(); + sink.stop(); + HTable table = new HTable(testUtility.getConfiguration(), tableName); + byte[][] results = getResults(table, 1); + byte[] out = results[0]; + Assert.assertArrayEquals(e.getBody(), out); + out = results[1]; + Assert.assertArrayEquals(Longs.toByteArray(1), out); + testUtility.deleteTable(tableName.getBytes()); + } + + @Test public void testOneEvent() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(testUtility.getConfiguration());