flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1657: Regex Extractor Interceptor
Date Thu, 15 Nov 2012 15:50:59 GMT
Updated Branches:
  refs/heads/flume-1.3.0 5483e5c0a -> f68ab7124


FLUME-1657: Regex Extractor Interceptor

(Cameron Gandevia via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f68ab712
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f68ab712
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f68ab712

Branch: refs/heads/flume-1.3.0
Commit: f68ab7124bd9c98d701e738bdd6c2c144dae19b6
Parents: 5483e5c
Author: Brock Noland <brock@apache.org>
Authored: Thu Nov 15 09:49:52 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Thu Nov 15 09:50:21 2012 -0600

----------------------------------------------------------------------
 .../apache/flume/interceptor/InterceptorType.java  |    5 +-
 .../interceptor/RegexExtractorInterceptor.java     |  223 +++++++++++++++
 .../RegexExtractorInterceptorMillisSerializer.java |   56 ++++
 ...xExtractorInterceptorPassThroughSerializer.java |   44 +++
 .../RegexExtractorInterceptorSerializer.java       |   37 +++
 .../interceptor/TestRegexExtractorInterceptor.java |  148 ++++++++++
 ...tRegexExtractorInterceptorMillisSerializer.java |   61 ++++
 ...xExtractorInterceptorPassThroughSerializer.java |   34 +++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   47 +++
 9 files changed, 653 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f68ab712/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
index c478337..c84cea5 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
@@ -23,8 +23,9 @@ public enum InterceptorType {
   TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class),
   HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class),
   STATIC(org.apache.flume.interceptor.StaticInterceptor.Builder.class),
-  REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class)
-  ;
+  REGEX_FILTER(
+      org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class),
+  REGEX_EXTRACTOR(org.apache.flume.interceptor.RegexExtractorInterceptor.Builder.class);
 
   private final Class<? extends Interceptor.Builder> builderClass;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/f68ab712/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
new file mode 100644
index 0000000..d9c3762
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * Interceptor that extracts matches using a specified regular expression and
+ * appends the matches to the event headers using the specified serializers</p>
+ * Note that all regular expression matching occurs through Java's built in
+ * java.util.regex package</p>. Properties:
+ * <p>
+ * regex: The regex to use
+ * <p>
+ * serializers: Comma separated list of headerName:fully qualified serializer
+ * class, if the serializer class is not specified the default
+ * {@link RegexExtractorInterceptorPassThroughSerializer} will be used
+ * <p>
+ * Sample config:
+ * <p>
+ * <code>
+ *   agent.sources.r1.channels = c1<p>
+ *   agent.sources.r1.type = SEQ<p>
+ *   agent.sources.r1.interceptors = i1<p>
+ *   agent.sources.r1.interceptors.i1.type = REGEX_EXTRACTOR<p>
+ *   agent.sources.r1.interceptors.i1.regex = (WARNING)|(ERROR)|(FATAL)<p>
+ *   agent.sources.r1.interceptors.i1.serializer = warning:com.blah.SomeSerializer,error,fatal:org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer<p>
+ *   agent.sources.r1.interceptors.i1.org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer.dateFormat
= yyyy-MM-dd
+ * </code>
+ * </p>
+ * <pre>
+ * Example 1:
+ * </p>
+ * EventBody: 1:2:3.4foobar5</p> Configuration:
+ * agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
+ * </p>
+ * agent.sources.r1.interceptors.i1.matchGroupIds = one,two,three
+ * </p>
+ * results in an event with the the following
+ *
+ * body: 1:2:3.4foobar5 headers: one=>1, two=>2, three=3
+ *
+ * Example 2:
+ *
+ * EventBody: 1:2:3.4foobar5
+ *
+ * Configuration: agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
+ * <p>
+ * agent.sources.r1.interceptors.i1.matchGroupIds = one,two
+ * <p>
+ *
+ * results in an event with the the following
+ *
+ * body: 1:2:3.4foobar5 headers: one=>1, two=>2
+ * </pre>
+ */
+public class RegexExtractorInterceptor implements Interceptor {
+
+  static final String REGEX = "regex";
+  static final String SERIALIZERS = "serializers";
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(RegexExtractorInterceptor.class);
+
+  private final Pattern regex;
+  private final List<NameAndSerializer> serializers;
+
+  private RegexExtractorInterceptor(Pattern regex,
+      List<NameAndSerializer> serializers) {
+    this.regex = regex;
+    this.serializers = serializers;
+  }
+
+  @Override
+  public void initialize() {
+    // NO-OP...
+  }
+
+  @Override
+  public void close() {
+    // NO-OP...
+  }
+
+  @Override
+  public Event intercept(Event event) {
+    Matcher matcher = regex.matcher(new String(event.getBody()));
+    Map<String, String> headers = event.getHeaders();
+    if (matcher.find()) {
+      for (int group = 0, count = matcher.groupCount(); group < count; group++) {
+        int groupIndex = group + 1;
+        if (groupIndex > serializers.size()) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Skipping group {} to {} due to missing serializer",
+                group, count);
+          }
+          break;
+        }
+        NameAndSerializer serializer = serializers.get(group);
+        if (logger.isDebugEnabled()) {
+          logger.debug("Serializing {} using {}", serializer.headerName,
+              serializer.serializer);
+        }
+        headers.put(serializer.headerName,
+            serializer.serializer.serialize(matcher.group(groupIndex)));
+      }
+    }
+    return event;
+  }
+
+  @Override
+  public List<Event> intercept(List<Event> events) {
+    List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
+    for (Event event : events) {
+      Event interceptedEvent = intercept(event);
+      if (interceptedEvent != null) {
+        intercepted.add(interceptedEvent);
+      }
+    }
+    return intercepted;
+  }
+
+  public static class Builder implements Interceptor.Builder {
+
+    private Pattern regex;
+    private List<NameAndSerializer> serializerList;
+    private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
+
+    @Override
+    public void configure(Context context) {
+      String regexString = context.getString(REGEX);
+      Preconditions.checkArgument(!StringUtils.isEmpty(regexString),
+          "Must supply a valid regex string");
+      regex = Pattern.compile(regexString);
+
+      configureSerializers(context);
+    }
+
+    private void configureSerializers(Context context) {
+      String serializerString = context.getString(SERIALIZERS);
+      Preconditions.checkArgument(!StringUtils.isEmpty(serializerString),
+          "Must supply at least one name and serializer");
+
+      String[] nameAndSerializerList = serializerString.split(",");
+      serializerList = Lists
+          .newArrayListWithCapacity(nameAndSerializerList.length);
+      for (String nameAndSerializer : nameAndSerializerList) {
+        String[] splitNameAndSerializer = nameAndSerializer.split(":");
+        String name = splitNameAndSerializer[0].trim();
+        if (splitNameAndSerializer.length > 1) {
+          String serializer = splitNameAndSerializer[1].trim();
+          serializerList.add(new NameAndSerializer(name, getCustomSerializer(
+              serializer, context)));
+        } else {
+          serializerList.add(new NameAndSerializer(name, defaultSerializer));
+        }
+      }
+    }
+
+    private RegexExtractorInterceptorSerializer getCustomSerializer(
+        String clazzName, Context context) {
+      try {
+        Context serializerContext = new Context();
+        serializerContext.putAll(context.getSubProperties(clazzName + "."));
+        RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer)
Class
+            .forName(clazzName).newInstance();
+        serializer.configure(serializerContext);
+        return serializer;
+      } catch (Exception e) {
+        logger.error("Could not instantiate event serializer.", e);
+        Throwables.propagate(e);
+      }
+      return defaultSerializer;
+    }
+
+    @Override
+    public Interceptor build() {
+      Preconditions.checkArgument(regex != null,
+          "Regex pattern was misconfigured");
+      Preconditions.checkArgument(serializerList.size() > 0,
+          "Must supply a valid group match id list");
+      return new RegexExtractorInterceptor(regex, serializerList);
+    }
+  }
+
+  static class NameAndSerializer {
+    private final String headerName;
+    private final RegexExtractorInterceptorSerializer serializer;
+
+    public NameAndSerializer(String headerName,
+        RegexExtractorInterceptorSerializer serializer) {
+      this.headerName = headerName;
+      this.serializer = serializer;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f68ab712/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java
new file mode 100644
index 0000000..83bf0c9
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Serializer that converts the passed in value into milliseconds using the
+ * specified formatting pattern
+ */
+public class RegexExtractorInterceptorMillisSerializer implements
+    RegexExtractorInterceptorSerializer {
+
+  private DateTimeFormatter formatter;
+
+  @Override
+  public void configure(Context context) {
+    String pattern = context.getString("pattern");
+    Preconditions.checkArgument(!StringUtils.isEmpty(pattern),
+        "Must configure with a valid pattern");
+    formatter = DateTimeFormat.forPattern(pattern);
+  }
+
+  @Override
+  public String serialize(String value) {
+    DateTime dateTime = formatter.parseDateTime(value);
+    return Long.toString(dateTime.getMillis());
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+    // NO-OP...
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f68ab712/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java
new file mode 100644
index 0000000..cecf631
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.ComponentConfiguration;
+
+/**
+ * Serializer that simply returns the passed in value
+ */
+public class RegexExtractorInterceptorPassThroughSerializer implements
+    RegexExtractorInterceptorSerializer {
+
+  @Override
+  public String serialize(String value) {
+    return value;
+  }
+
+  @Override
+  public void configure(Context context) {
+    // NO-OP...
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+    // NO-OP...
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f68ab712/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java
new file mode 100644
index 0000000..6cca098
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+
+/**
+ * Serializer for serializing groups matched by the
+ * {@link RegexExtractorInterceptor}
+ */
+public interface RegexExtractorInterceptorSerializer extends Configurable,
+    ConfigurableComponent {
+
+  /**
+   * @param value
+   *          The value extracted by the {@link RegexExtractorInterceptor}
+   * @return The serialized version of the specified value
+   */
+  String serialize(String value);
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f68ab712/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java
new file mode 100644
index 0000000..e03e9e2
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.interceptor.Interceptor.Builder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+public class TestRegexExtractorInterceptor {
+
+  private Builder fixtureBuilder;
+
+  @Before
+  public void init() throws Exception {
+    fixtureBuilder = InterceptorBuilderFactory
+        .newInstance(InterceptorType.REGEX_EXTRACTOR.toString());
+  }
+
+  @Test
+  public void shouldNotAllowConfigurationWithoutRegex() throws Exception {
+    try {
+      fixtureBuilder.build();
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Pass...
+    }
+  }
+
+  @Test
+  public void shouldNotAllowConfigurationWithIllegalRegex() throws Exception {
+    try {
+      Context context = new Context();
+      context.put(RegexExtractorInterceptor.REGEX, "?&?&&&?&?&?&&&??");
+      fixtureBuilder.configure(context);
+      fixtureBuilder.build();
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Pass...
+    }
+  }
+
+  @Test
+  public void shouldNotAllowConfigurationWithoutMatchIds() throws Exception {
+    try {
+      Context context = new Context();
+      context.put(RegexExtractorInterceptor.REGEX, ".*");
+      context.put(RegexExtractorInterceptor.SERIALIZERS, "");
+      fixtureBuilder.configure(context);
+      fixtureBuilder.build();
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Pass...
+    }
+  }
+
+  @Test
+  public void shouldExtractAddHeadersForAllMatchGroups() throws Exception {
+    Context context = new Context();
+    context.put(RegexExtractorInterceptor.REGEX, "(\\d):(\\d):(\\d)");
+    context.put(RegexExtractorInterceptor.SERIALIZERS, "Num1,Num2,Num3");
+    fixtureBuilder.configure(context);
+    Interceptor fixture = fixtureBuilder.build();
+
+    Event event = EventBuilder.withBody("1:2:3.4foobar5", Charsets.UTF_8);
+
+    Event expected = EventBuilder.withBody("1:2:3.4foobar5", Charsets.UTF_8);
+    expected.getHeaders().put("Num1", "1");
+    expected.getHeaders().put("Num2", "2");
+    expected.getHeaders().put("Num3", "3");
+
+    Event actual = fixture.intercept(event);
+
+    Assert.assertArrayEquals(expected.getBody(), actual.getBody());
+    Assert.assertEquals(expected.getHeaders(), actual.getHeaders());
+  }
+
+  @Test
+  public void shouldExtractAddHeadersForAllMatchGroupsIgnoringMissingIds()
+      throws Exception {
+    String body = "2012-10-17 14:34:44,338";
+    Context context = new Context();
+    // Skip the second group
+    context.put(RegexExtractorInterceptor.REGEX,
+        "^(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)(:\\d\\d,\\d\\d\\d)");
+    context.put(RegexExtractorInterceptor.SERIALIZERS, "timestamp");
+
+    fixtureBuilder.configure(context);
+    Interceptor fixture = fixtureBuilder.build();
+
+    Event event = EventBuilder.withBody(body, Charsets.UTF_8);
+    Event expected = EventBuilder.withBody(body, Charsets.UTF_8);
+    expected.getHeaders().put("timestamp", "2012-10-17 14:34");
+
+    Event actual = fixture.intercept(event);
+
+    Assert.assertArrayEquals(expected.getBody(), actual.getBody());
+    Assert.assertEquals(expected.getHeaders(), actual.getHeaders());
+
+  }
+
+  @Test
+  public void shouldExtractAddHeadersUsingSpecifiedSerializer()
+      throws Exception {
+    String body = "2012-10-17 14:34:44,338";
+    Context context = new Context();
+    // Skip the second group
+    context.put(RegexExtractorInterceptor.REGEX,
+        "^(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)(:\\d\\d,\\d\\d\\d)");
+    context.put(RegexExtractorInterceptor.SERIALIZERS, "timestamp:"
+        + RegexExtractorInterceptorMillisSerializer.class.getName() + ",data");
+    context.put(RegexExtractorInterceptorMillisSerializer.class.getName()
+        + ".pattern", "yyyy-MM-dd HH:mm");
+
+    fixtureBuilder.configure(context);
+    Interceptor fixture = fixtureBuilder.build();
+
+    Event event = EventBuilder.withBody(body, Charsets.UTF_8);
+    Event expected = EventBuilder.withBody(body, Charsets.UTF_8);
+    expected.getHeaders().put("timestamp", "1350509640000");
+    expected.getHeaders().put("data", ":44,338");
+
+    Event actual = fixture.intercept(event);
+
+    Assert.assertArrayEquals(expected.getBody(), actual.getBody());
+    Assert.assertEquals(expected.getHeaders(), actual.getHeaders());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f68ab712/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
new file mode 100644
index 0000000..1f87d9a
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import junit.framework.Assert;
+
+import org.apache.flume.Context;
+import org.junit.Test;
+
+public class TestRegexExtractorInterceptorMillisSerializer {
+
+  @Test
+  public void shouldRequirePatternInConfiguration() {
+    try {
+      RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer();
+      fixture.configure(new Context());
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Expected...
+    }
+  }
+
+  @Test
+  public void shouldRequireValidPatternInConfiguration() {
+    try {
+      RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer();
+      Context context = new Context();
+      context.put("pattern", "ABCDEFG");
+      fixture.configure(context);
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Expected...
+    }
+  }
+
+  @Test
+  public void shouldReturnMillisFromPattern() {
+    RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer();
+    Context context = new Context();
+    context.put("pattern", "yyyy-MM-dd HH:mm:ss");
+    fixture.configure(context);
+
+    Assert.assertEquals("1269616953000",
+        fixture.serialize("2010-03-26 08:22:33"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f68ab712/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
new file mode 100644
index 0000000..569c274
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import junit.framework.Assert;
+
+import org.apache.flume.Context;
+import org.junit.Test;
+
+public class TestRegexExtractorInterceptorPassThroughSerializer {
+
+  @Test
+  public void shouldReturnSameValue() {
+    RegexExtractorInterceptorPassThroughSerializer fixture = new RegexExtractorInterceptorPassThroughSerializer();
+    fixture.configure(new Context());
+    String input = "testing (1,2,3,4)";
+    Assert.assertEquals(input, fixture.serialize(input));
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f68ab712/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 0e6fdcb..303ec71 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2095,6 +2095,53 @@ regex             ".*"     Regular expression for matching against
events
 excludeEvents     false    If true, regex determines events to exclude, otherwise regex determines
events to include.
 ================  =======  ========================================================================
 
+Regex Extractor Interceptor
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This interceptor extracts regex match groups using a specified regular expression and appends
the match groups as headers on the event. It also supports pluggable serializers for formatting
the match groups before adding them as event headers.
+
+================  ============================== ================================================================================
+Property Name     Default                        Description
+================  ============================== ================================================================================
+**type**          --                             The component type name has to be ``REGEX_EXTRACTOR``
+**regex**         --                             Regular expression for matching against
events
+**serializer**    --                             Comma separated list of header name colon
serializer. (See example below)
+                                                 The following are support serializers out
of the box
+                                                 ``org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer``
+                                                 ``org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer``
+================  ============================== ================================================================================
+
+The serializers are used to map the matches to a header name and a formatted header value,
by default you only need to specify
+the header name and the default ``org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer``
will be used. 
+This serializer simply maps the matches to the specified header name and passes the value
through as it was extracted by the regex. 
+You can plug custom serializer implementations into the extractor using the fully qualified
class name (FQCN) to format the matches
+in anyway you like.
+
+Example 1:
+~~~~~~~~~~
+
+If the Flume event body contained ``1:2:3.4foobar5`` and the following configuration was
used
+
+.. code-block:: properties
+
+  agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
+  agent.sources.r1.interceptors.i1.serializer = one,two,three
+
+the extracted event will contain the same body but the following headers will have been added
``one=>1, two=>2, three=>3``
+
+Example 2:
+~~~~~~~~~~
+
+If the Flume event body contained ``2012-10-18 18:47:57,614 some log line`` and the following
configuration was used
+
+.. code-block:: properties
+
+  agent.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
+  agent.sources.r1.interceptors.i1.serializer = timestamp:org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
+  agent.sources.r1.interceptors.i1.serializer.org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer.pattern
= yyyy-MM-dd HH:mm
+
+the extracted event will contain the same body but the following headers will have been added
``timestamp=>1350611220000``
+
 Flume Properties
 ----------------
 


Mime
View raw message