flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1945: HBase Serializer allow key from regular expression group
Date Tue, 04 Jun 2013 04:45:22 GMT
Updated Branches:
  refs/heads/trunk d63c378b5 -> e442c29a6


FLUME-1945: HBase Serializer allow key from regular expression group

(Sravya Tirukkovalur via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e442c29a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e442c29a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e442c29a

Branch: refs/heads/trunk
Commit: e442c29a646723a75d67fe4dd189cc304dfd4af4
Parents: d63c378
Author: Brock Noland <brock@apache.org>
Authored: Mon Jun 3 21:44:53 2013 -0700
Committer: Brock Noland <brock@apache.org>
Committed: Mon Jun 3 21:44:53 2013 -0700

----------------------------------------------------------------------
 .../sink/hbase/RegexHbaseEventSerializer.java      |   59 +++++++++++----
 .../sink/hbase/TestRegexHbaseEventSerializer.java  |   36 ++++++++-
 2 files changed, 75 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e442c29a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
index 27974d9..7d2b8b7 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
@@ -56,15 +56,21 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer
{
   /** Regular expression used to parse groups from event data. */
   public static final String REGEX_CONFIG = "regex";
   public static final String REGEX_DEFAULT = "(.*)";
-  
+
   /** Whether to ignore case when performing regex matches. */
   public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase";
   public static final boolean INGORE_CASE_DEFAULT = false;
-  
+
   /** Comma separated list of column names to place matching groups in. */
   public static final String COL_NAME_CONFIG = "colNames";
   public static final String COLUMN_NAME_DEFAULT = "payload";
 
+  /** Index of the row key in matched regex groups */
+  public static final String ROW_KEY_INDEX_CONFIG = "rowKeyIndex";
+
+  /** Placeholder in colNames for row key */
+  public static final String ROW_KEY_NAME = "ROW_KEY";
+
   /** Whether to deposit event headers into corresponding column qualifiers */
   public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders";
   public static final boolean DEPOSIT_HEADERS_DEFAULT = false;
@@ -72,12 +78,12 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer
{
   /** What charset to use when serializing into HBase's byte arrays */
   public static final String CHARSET_CONFIG = "charset";
   public static final String CHARSET_DEFAULT = "UTF-8";
-  
+
   /* This is a nonce used in HBase row-keys, such that the same row-key
    * never gets written more than once from within this JVM. */
   protected static final AtomicInteger nonce = new AtomicInteger(0);
   protected static String randomKey = RandomStringUtils.randomAlphanumeric(10);
-  
+
   protected byte[] cf;
   private byte[] payload;
   private List<byte[]> colNames = Lists.newArrayList();
@@ -86,11 +92,12 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer
{
   private boolean depositHeaders;
   private Pattern inputPattern;
   private Charset charset;
-  
+  private int rowKeyIndex;
+
   @Override
   public void configure(Context context) {
     String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT);
-    regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, 
+    regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG,
         INGORE_CASE_DEFAULT);
     depositHeaders = context.getBoolean(DEPOSIT_HEADERS_CONFIG,
         DEPOSIT_HEADERS_DEFAULT);
@@ -98,12 +105,26 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer
{
         + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));
     charset = Charset.forName(context.getString(CHARSET_CONFIG,
         CHARSET_DEFAULT));
-    
+
     String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);
     String[] columnNames = colNameStr.split(",");
-    for (String s: columnNames) { 
+    for (String s: columnNames) {
       colNames.add(s.getBytes(charset));
     }
+
+    //Rowkey is optional, default is -1
+    rowKeyIndex = context.getInteger(ROW_KEY_INDEX_CONFIG, -1);
+    //if row key is being used, make sure it is specified correct
+    if(rowKeyIndex >=0){
+      if(rowKeyIndex >= columnNames.length) {
+        throw new IllegalArgumentException(ROW_KEY_INDEX_CONFIG + " must be " +
+          "less than num columns " + columnNames.length);
+      }
+      if(!ROW_KEY_NAME.equalsIgnoreCase(columnNames[rowKeyIndex])) {
+        throw new IllegalArgumentException("Column at " + rowKeyIndex + " must be "
+        + ROW_KEY_NAME + " and is " + columnNames[rowKeyIndex]);
+      }
+    }
   }
 
   @Override
@@ -116,7 +137,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer
{
     this.payload = event.getBody();
     this.cf = columnFamily;
   }
-  
+
   /**
    * Returns a row-key with the following format:
    * [time in millis]-[random key]-[nonce]
@@ -141,11 +162,11 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer
{
         randomKey, nonce.getAndIncrement());
     return rowKey.getBytes(charset);
   }
-  
+
   protected byte[] getRowKey() {
     return getRowKey(Calendar.getInstance());
   }
-  
+
   @Override
   public List<Row> getActions() throws FlumeException {
     List<Row> actions = Lists.newArrayList();
@@ -154,17 +175,23 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer
{
     if (!m.matches()) {
       return Lists.newArrayList();
     }
-    
+
     if (m.groupCount() != colNames.size()) {
       return Lists.newArrayList();
     }
-    
+
     try {
-      rowKey = getRowKey();
+      if(rowKeyIndex < 0){
+        rowKey = getRowKey();
+      }else{
+        rowKey = m.group(rowKeyIndex + 1).getBytes(Charsets.UTF_8);
+      }
       Put put = new Put(rowKey);
-      
+
       for (int i = 0; i < colNames.size(); i++) {
-        put.add(cf, colNames.get(i), m.group(i + 1).getBytes(charset));
+        if(i != rowKeyIndex) {
+          put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
+        }
       }
       if (depositHeaders) {
         for (Map.Entry<String, String> entry : headers.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flume/blob/e442c29a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
index 191dc54..b102b49 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
@@ -72,15 +72,43 @@ public class TestRegexHbaseEventSerializer {
     assertEquals("The sky is falling!",
         resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT));
   }
-  
+  @Test
+  public void testRowIndexKey() throws Exception {
+    RegexHbaseEventSerializer s = new RegexHbaseEventSerializer();
+    Context context = new Context();
+    context.put(RegexHbaseEventSerializer.REGEX_CONFIG,"^([^\t]+)\t([^\t]+)\t" +
+      "([^\t]+)$");
+    context.put(RegexHbaseEventSerializer.COL_NAME_CONFIG, "col1,col2,ROW_KEY");
+    context.put("rowKeyIndex", "2");
+    s.configure(context);
+
+    String body = "val1\tval2\trow1";
+    Event e = EventBuilder.withBody(Bytes.toBytes(body));
+    s.initialize(e, "CF".getBytes());
+    List<Row> actions = s.getActions();
+
+    Put put = (Put)actions.get(0);
+
+    List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf);
+    assertTrue(kvPairs.size() == 2);
+
+    Map<String, String> resultMap = Maps.newHashMap();
+    for (KeyValue kv : kvPairs) {
+      resultMap.put(new String(kv.getQualifier()), new String(kv.getValue()));
+    }
+    assertEquals("val1", resultMap.get("col1"));
+    assertEquals("val2", resultMap.get("col2"));
+    assertEquals("row1", Bytes.toString(put.getRow()));
+  }
+
   @Test
   /** Test a common case where regex is used to parse apache log format. */
   public void testApacheRegex() throws Exception {
     RegexHbaseEventSerializer s = new RegexHbaseEventSerializer();
     Context context = new Context();
-    context.put(RegexHbaseEventSerializer.REGEX_CONFIG, 
-        "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) \"([^ ]+) ([^ ]+)" + 
-        " ([^\"]+)\" (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\")" + 
+    context.put(RegexHbaseEventSerializer.REGEX_CONFIG,
+        "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) \"([^ ]+) ([^ ]+)" +
+        " ([^\"]+)\" (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\")" +
         " ([^ \"]*|\"[^\"]*\"))?");
     context.put(RegexHbaseEventSerializer.COL_NAME_CONFIG,
         "host,identity,user,time,method,request,protocol,status,size," +


Mime
View raw message