flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1657: Regex Extractor Interceptor
Date Thu, 15 Nov 2012 16:18:37 GMT
Updated Branches:
  refs/heads/trunk 310e70ee3 -> 9f9c04813


FLUME-1657: Regex Extractor Interceptor

(Cameron Gandevia via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9f9c0481
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9f9c0481
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9f9c0481

Branch: refs/heads/trunk
Commit: 9f9c04813573d18c8055582c4cddbbba95c8693b
Parents: 310e70e
Author: Brock Noland <brock@apache.org>
Authored: Thu Nov 15 10:17:22 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Thu Nov 15 10:17:22 2012 -0600

----------------------------------------------------------------------
 .../apache/flume/interceptor/InterceptorType.java  |    5 +-
 .../interceptor/RegexExtractorInterceptor.java     |  245 +++++++++++++++
 .../RegexExtractorInterceptorMillisSerializer.java |   56 ++++
 ...xExtractorInterceptorPassThroughSerializer.java |   44 +++
 .../RegexExtractorInterceptorSerializer.java       |   37 +++
 .../interceptor/TestRegexExtractorInterceptor.java |  190 +++++++++++
 ...tRegexExtractorInterceptorMillisSerializer.java |   61 ++++
 ...xExtractorInterceptorPassThroughSerializer.java |   34 ++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   52 +++
 9 files changed, 722 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/9f9c0481/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
index c478337..c84cea5 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
@@ -23,8 +23,9 @@ public enum InterceptorType {
   TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class),
   HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class),
   STATIC(org.apache.flume.interceptor.StaticInterceptor.Builder.class),
-  REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class)
-  ;
+  REGEX_FILTER(
+      org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class),
+  REGEX_EXTRACTOR(org.apache.flume.interceptor.RegexExtractorInterceptor.Builder.class);
 
   private final Class<? extends Interceptor.Builder> builderClass;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/9f9c0481/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
new file mode 100644
index 0000000..67cfc43
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * Interceptor that extracts matches using a specified regular expression and
+ * appends the matches to the event headers using the specified serializers</p>
+ * Note that all regular expression matching occurs through Java's built in
+ * java.util.regex package</p>. Properties:
+ * <p>
+ * regex: The regex to use
+ * <p>
+ * serializers: Specifies the group the serializer will be applied to, and the
+ * name of the header that will be added. If no serializer is specified for a
+ * group the default {@link RegexExtractorInterceptorPassThroughSerializer} will
+ * be used
+ * <p>
+ * Sample config:
+ * <p>
+ * agent.sources.r1.channels = c1
+ * <p>
+ * agent.sources.r1.type = SEQ
+ * <p>
+ * agent.sources.r1.interceptors = i1
+ * <p>
+ * agent.sources.r1.interceptors.i1.type = REGEX_EXTRACTOR
+ * <p>
+ * agent.sources.r1.interceptors.i1.regex = (WARNING)|(ERROR)|(FATAL)
+ * <p>
+ * agent.sources.r1.interceptors.i1.serializers = s1 s2
+ * agent.sources.r1.interceptors.i1.serializers.s1.type = com.blah.SomeSerializer
+ * agent.sources.r1.interceptors.i1.serializers.s1.name = warning
+ * agent.sources.r1.interceptors.i1.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer
+ * agent.sources.r1.interceptors.i1.serializers.s2.name = error
+ * agent.sources.r1.interceptors.i1.serializers.s2.dateFormat = yyyy-MM-dd
+ * </code>
+ * </p>
+ * <pre>
+ * Example 1:
+ * </p>
+ * EventBody: 1:2:3.4foobar5</p> Configuration:
+ * agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
+ * </p>
+ * agent.sources.r1.interceptors.i1.serializers = s1 s2 s3
+ * agent.sources.r1.interceptors.i1.serializers.s1.name = one
+ * agent.sources.r1.interceptors.i1.serializers.s2.name = two
+ * agent.sources.r1.interceptors.i1.serializers.s3.name = three
+ * </p>
+ * results in an event with the the following
+ *
+ * body: 1:2:3.4foobar5 headers: one=>1, two=>2, three=3
+ *
+ * Example 2:
+ *
+ * EventBody: 1:2:3.4foobar5
+ *
+ * Configuration: agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
+ * <p>
+ * agent.sources.r1.interceptors.i1.serializers = s1 s2
+ * agent.sources.r1.interceptors.i1.serializers.s1.name = one
+ * agent.sources.r1.interceptors.i1.serializers.s2.name = two
+ * <p>
+ *
+ * results in an event with the the following
+ *
+ * body: 1:2:3.4foobar5 headers: one=>1, two=>2
+ * </pre>
+ */
+public class RegexExtractorInterceptor implements Interceptor {
+
+  static final String REGEX = "regex";
+  static final String SERIALIZERS = "serializers";
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(RegexExtractorInterceptor.class);
+
+  private final Pattern regex;
+  private final List<NameAndSerializer> serializers;
+
+  private RegexExtractorInterceptor(Pattern regex,
+      List<NameAndSerializer> serializers) {
+    this.regex = regex;
+    this.serializers = serializers;
+  }
+
+  @Override
+  public void initialize() {
+    // NO-OP...
+  }
+
+  @Override
+  public void close() {
+    // NO-OP...
+  }
+
+  @Override
+  public Event intercept(Event event) {
+    Matcher matcher = regex.matcher(
+        new String(event.getBody(), Charsets.UTF_8));
+    Map<String, String> headers = event.getHeaders();
+    if (matcher.find()) {
+      for (int group = 0, count = matcher.groupCount(); group < count; group++) {
+        int groupIndex = group + 1;
+        if (groupIndex > serializers.size()) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Skipping group {} to {} due to missing serializer",
+                group, count);
+          }
+          break;
+        }
+        NameAndSerializer serializer = serializers.get(group);
+        if (logger.isDebugEnabled()) {
+          logger.debug("Serializing {} using {}", serializer.headerName,
+              serializer.serializer);
+        }
+        headers.put(serializer.headerName,
+            serializer.serializer.serialize(matcher.group(groupIndex)));
+      }
+    }
+    return event;
+  }
+
+  @Override
+  public List<Event> intercept(List<Event> events) {
+    List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
+    for (Event event : events) {
+      Event interceptedEvent = intercept(event);
+      if (interceptedEvent != null) {
+        intercepted.add(interceptedEvent);
+      }
+    }
+    return intercepted;
+  }
+
+  public static class Builder implements Interceptor.Builder {
+
+    private Pattern regex;
+    private List<NameAndSerializer> serializerList;
+    private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
+
+    @Override
+    public void configure(Context context) {
+      String regexString = context.getString(REGEX);
+      Preconditions.checkArgument(!StringUtils.isEmpty(regexString),
+          "Must supply a valid regex string");
+      regex = Pattern.compile(regexString);
+      regex.pattern();
+      regex.matcher("").groupCount();
+      configureSerializers(context);
+    }
+
+    private void configureSerializers(Context context) {
+      String serializerListStr = context.getString(SERIALIZERS);
+      Preconditions.checkArgument(!StringUtils.isEmpty(serializerListStr),
+          "Must supply at least one name and serializer");
+
+      String[] serializerNames = serializerListStr.split("\\s+");
+
+      Context serializerContexts =
+          new Context(context.getSubProperties(SERIALIZERS + "."));
+
+      serializerList = Lists.newArrayListWithCapacity(serializerNames.length);
+      for(String serializerName : serializerNames) {
+        Context serializerContext = new Context(
+            serializerContexts.getSubProperties(serializerName + "."));
+        String type = serializerContext.getString("type", "DEFAULT");
+        String name = serializerContext.getString("name");
+        Preconditions.checkArgument(!StringUtils.isEmpty(name),
+            "Supplied name cannot be empty.");
+
+        if("DEFAULT".equals(type)) {
+          serializerList.add(new NameAndSerializer(name, defaultSerializer));
+        } else {
+          serializerList.add(new NameAndSerializer(name, getCustomSerializer(
+              type, serializerContext)));
+        }
+      }
+    }
+
+    private RegexExtractorInterceptorSerializer getCustomSerializer(
+        String clazzName, Context context) {
+      try {
+        RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer)
Class
+            .forName(clazzName).newInstance();
+        serializer.configure(context);
+        return serializer;
+      } catch (Exception e) {
+        logger.error("Could not instantiate event serializer.", e);
+        Throwables.propagate(e);
+      }
+      return defaultSerializer;
+    }
+
+    @Override
+    public Interceptor build() {
+      Preconditions.checkArgument(regex != null,
+          "Regex pattern was misconfigured");
+      Preconditions.checkArgument(serializerList.size() > 0,
+          "Must supply a valid group match id list");
+      return new RegexExtractorInterceptor(regex, serializerList);
+    }
+  }
+
+  static class NameAndSerializer {
+    private final String headerName;
+    private final RegexExtractorInterceptorSerializer serializer;
+
+    public NameAndSerializer(String headerName,
+        RegexExtractorInterceptorSerializer serializer) {
+      this.headerName = headerName;
+      this.serializer = serializer;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9f9c0481/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java
new file mode 100644
index 0000000..83bf0c9
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Serializer that converts the passed in value into milliseconds using the
+ * specified formatting pattern
+ */
+public class RegexExtractorInterceptorMillisSerializer implements
+    RegexExtractorInterceptorSerializer {
+
+  private DateTimeFormatter formatter;
+
+  @Override
+  public void configure(Context context) {
+    String pattern = context.getString("pattern");
+    Preconditions.checkArgument(!StringUtils.isEmpty(pattern),
+        "Must configure with a valid pattern");
+    formatter = DateTimeFormat.forPattern(pattern);
+  }
+
+  @Override
+  public String serialize(String value) {
+    DateTime dateTime = formatter.parseDateTime(value);
+    return Long.toString(dateTime.getMillis());
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+    // NO-OP...
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9f9c0481/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java
new file mode 100644
index 0000000..cecf631
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.ComponentConfiguration;
+
+/**
+ * Serializer that simply returns the passed in value
+ */
+public class RegexExtractorInterceptorPassThroughSerializer implements
+    RegexExtractorInterceptorSerializer {
+
+  @Override
+  public String serialize(String value) {
+    return value;
+  }
+
+  @Override
+  public void configure(Context context) {
+    // NO-OP...
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+    // NO-OP...
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9f9c0481/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java
b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java
new file mode 100644
index 0000000..6cca098
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+
+/**
+ * Serializer for serializing groups matched by the
+ * {@link RegexExtractorInterceptor}
+ */
+public interface RegexExtractorInterceptorSerializer extends Configurable,
+    ConfigurableComponent {
+
+  /**
+   * @param value
+   *          The value extracted by the {@link RegexExtractorInterceptor}
+   * @return The serialized version of the specified value
+   */
+  String serialize(String value);
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9f9c0481/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java
new file mode 100644
index 0000000..61749c3
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.interceptor.Interceptor.Builder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+
+public class TestRegexExtractorInterceptor {
+
+  private Builder fixtureBuilder;
+
+  @Before
+  public void init() throws Exception {
+    fixtureBuilder = InterceptorBuilderFactory
+        .newInstance(InterceptorType.REGEX_EXTRACTOR.toString());
+  }
+
+  @Test
+  public void shouldNotAllowConfigurationWithoutRegex() throws Exception {
+    try {
+      fixtureBuilder.build();
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Pass...
+    }
+  }
+
+  @Test
+  public void shouldNotAllowConfigurationWithIllegalRegex() throws Exception {
+    try {
+      Context context = new Context();
+      context.put(RegexExtractorInterceptor.REGEX, "?&?&&&?&?&?&&&??");
+      fixtureBuilder.configure(context);
+      fixtureBuilder.build();
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Pass...
+    }
+  }
+
+  @Test
+  public void shouldNotAllowConfigurationWithoutMatchIds() throws Exception {
+    try {
+      Context context = new Context();
+      context.put(RegexExtractorInterceptor.REGEX, ".*");
+      context.put(RegexExtractorInterceptor.SERIALIZERS, "");
+      fixtureBuilder.configure(context);
+      fixtureBuilder.build();
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Pass...
+    }
+  }
+
+  @Test
+  public void shouldNotAllowMisconfiguredSerializers() throws Exception {
+    try {
+      Context context = new Context();
+      context.put(RegexExtractorInterceptor.REGEX, "(\\d):(\\d):(\\d)");
+      context.put(RegexExtractorInterceptor.SERIALIZERS, ",,,");
+      fixtureBuilder.configure(context);
+      fixtureBuilder.build();
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Pass...
+    }
+  }
+
+  @Test
+  public void shouldNotAllowEmptyNames() throws Exception {
+    try {
+      String space = " ";
+      Context context = new Context();
+      context.put(RegexExtractorInterceptor.REGEX, "(\\d):(\\d):(\\d)");
+      context.put(RegexExtractorInterceptor.SERIALIZERS,
+          Joiner.on(',').join(space, space, space));
+      fixtureBuilder.configure(context);
+      fixtureBuilder.build();
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Pass...
+    }
+  }
+
+  @Test
+  public void shouldExtractAddHeadersForAllMatchGroups() throws Exception {
+    Context context = new Context();
+    context.put(RegexExtractorInterceptor.REGEX, "(\\d):(\\d):(\\d)");
+    context.put(RegexExtractorInterceptor.SERIALIZERS, "s1 s2 s3");
+    context.put(RegexExtractorInterceptor.SERIALIZERS + ".s1.name", "Num1");
+    context.put(RegexExtractorInterceptor.SERIALIZERS + ".s2.name", "Num2");
+    context.put(RegexExtractorInterceptor.SERIALIZERS + ".s3.name", "Num3");
+
+    fixtureBuilder.configure(context);
+    Interceptor fixture = fixtureBuilder.build();
+
+    Event event = EventBuilder.withBody("1:2:3.4foobar5", Charsets.UTF_8);
+
+    Event expected = EventBuilder.withBody("1:2:3.4foobar5", Charsets.UTF_8);
+    expected.getHeaders().put("Num1", "1");
+    expected.getHeaders().put("Num2", "2");
+    expected.getHeaders().put("Num3", "3");
+
+    Event actual = fixture.intercept(event);
+
+    Assert.assertArrayEquals(expected.getBody(), actual.getBody());
+    Assert.assertEquals(expected.getHeaders(), actual.getHeaders());
+  }
+
+  @Test
+  public void shouldExtractAddHeadersForAllMatchGroupsIgnoringMissingIds()
+      throws Exception {
+    String body = "2012-10-17 14:34:44,338";
+    Context context = new Context();
+    // Skip the second group
+    context.put(RegexExtractorInterceptor.REGEX,
+        "^(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)(:\\d\\d,\\d\\d\\d)");
+    context.put(RegexExtractorInterceptor.SERIALIZERS, "s1");
+    context
+        .put(RegexExtractorInterceptor.SERIALIZERS + ".s1.name", "timestamp");
+
+    fixtureBuilder.configure(context);
+    Interceptor fixture = fixtureBuilder.build();
+
+    Event event = EventBuilder.withBody(body, Charsets.UTF_8);
+    Event expected = EventBuilder.withBody(body, Charsets.UTF_8);
+    expected.getHeaders().put("timestamp", "2012-10-17 14:34");
+
+    Event actual = fixture.intercept(event);
+
+    Assert.assertArrayEquals(expected.getBody(), actual.getBody());
+    Assert.assertEquals(expected.getHeaders(), actual.getHeaders());
+
+  }
+
+  @Test
+  public void shouldExtractAddHeadersUsingSpecifiedSerializer()
+      throws Exception {
+    String body = "2012-10-17 14:34:44,338";
+    Context context = new Context();
+    // Skip the second group
+    context.put(RegexExtractorInterceptor.REGEX,
+        "^(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)(:\\d\\d,\\d\\d\\d)");
+    context.put(RegexExtractorInterceptor.SERIALIZERS, "s1 s2");
+
+    String millisSerializers = RegexExtractorInterceptorMillisSerializer.class.getName();
+    context.put(RegexExtractorInterceptor.SERIALIZERS + ".s1.type", millisSerializers);
+    context.put(RegexExtractorInterceptor.SERIALIZERS + ".s1.name", "timestamp");
+    context.put(RegexExtractorInterceptor.SERIALIZERS + ".s1.pattern", "yyyy-MM-dd HH:mm");
+
+    // Default type
+    context.put(RegexExtractorInterceptor.SERIALIZERS + ".s2.name", "data");
+
+    fixtureBuilder.configure(context);
+    Interceptor fixture = fixtureBuilder.build();
+
+    Event event = EventBuilder.withBody(body, Charsets.UTF_8);
+    Event expected = EventBuilder.withBody(body, Charsets.UTF_8);
+    expected.getHeaders().put("timestamp", "1350509640000");
+    expected.getHeaders().put("data", ":44,338");
+
+    Event actual = fixture.intercept(event);
+
+    Assert.assertArrayEquals(expected.getBody(), actual.getBody());
+    Assert.assertEquals(expected.getHeaders(), actual.getHeaders());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9f9c0481/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
new file mode 100644
index 0000000..1f87d9a
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import junit.framework.Assert;
+
+import org.apache.flume.Context;
+import org.junit.Test;
+
+public class TestRegexExtractorInterceptorMillisSerializer {
+
+  @Test
+  public void shouldRequirePatternInConfiguration() {
+    try {
+      RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer();
+      fixture.configure(new Context());
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Expected...
+    }
+  }
+
+  @Test
+  public void shouldRequireValidPatternInConfiguration() {
+    try {
+      RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer();
+      Context context = new Context();
+      context.put("pattern", "ABCDEFG");
+      fixture.configure(context);
+      Assert.fail();
+    } catch (IllegalArgumentException ex) {
+      // Expected...
+    }
+  }
+
+  @Test
+  public void shouldReturnMillisFromPattern() {
+    RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer();
+    Context context = new Context();
+    context.put("pattern", "yyyy-MM-dd HH:mm:ss");
+    fixture.configure(context);
+
+    Assert.assertEquals("1269616953000",
+        fixture.serialize("2010-03-26 08:22:33"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9f9c0481/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
new file mode 100644
index 0000000..569c274
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.interceptor;
+
+import junit.framework.Assert;
+
+import org.apache.flume.Context;
+import org.junit.Test;
+
+public class TestRegexExtractorInterceptorPassThroughSerializer {
+
+  @Test
+  public void shouldReturnSameValue() {
+    RegexExtractorInterceptorPassThroughSerializer fixture = new RegexExtractorInterceptorPassThroughSerializer();
+    fixture.configure(new Context());
+    String input = "testing (1,2,3,4)";
+    Assert.assertEquals(input, fixture.serialize(input));
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9f9c0481/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 6670aa8..f1a895c 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2095,6 +2095,58 @@ regex             ".*"     Regular expression for matching against
events
 excludeEvents     false    If true, regex determines events to exclude, otherwise regex determines
events to include.
 ================  =======  ========================================================================
 
+Regex Extractor Interceptor
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This interceptor extracts regex match groups using a specified regular expression and appends
the match groups as headers on the event. It also supports pluggable serializers for formatting
the match groups before adding them as event headers.
+
+================  ============================== =================================================================================================
+Property Name     Default                        Description
+================  ============================== =================================================================================================
+**type**          --                             The component type name has to be ``REGEX_EXTRACTOR``
+**regex**         --                             Regular expression for matching against
events
+**serializer**    --                             Serializers for mapping matches to header
names and serializing their values. (See example below)
+                                                 The following are support serializers out
of the box
+                                                 ``org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer``
+                                                 ``org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer``
+================  ============================== =================================================================================================
+
+The serializers are used to map the matches to a header name and a formatted header value,
by default you only need to specify
+the header name and the default ``org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer``
will be used. 
+This serializer simply maps the matches to the specified header name and passes the value
through as it was extracted by the regex. 
+You can plug custom serializer implementations into the extractor using the fully qualified
class name (FQCN) to format the matches
+in anyway you like.
+
+Example 1:
+~~~~~~~~~~
+
+If the Flume event body contained ``1:2:3.4foobar5`` and the following configuration was
used
+
+.. code-block:: properties
+
+  agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
+  agent.sources.r1.interceptors.i1.serializers = s1 s2 s3
+  agent.sources.r1.interceptors.i1.serializers.s1.name = one
+  agent.sources.r1.interceptors.i1.serializers.s2.name = two
+  agent.sources.r1.interceptors.i1.serializers.s3.name = three
+
+the extracted event will contain the same body but the following headers will have been added
``one=>1, two=>2, three=>3``
+
+Example 2:
+~~~~~~~~~~
+
+If the Flume event body contained ``2012-10-18 18:47:57,614 some log line`` and the following
configuration was used
+
+.. code-block:: properties
+
+  agent.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
+  agent.sources.r1.interceptors.i1.serializers = s1
+  agent.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
+  agent.sources.r1.interceptors.i1.serializers.s1.name = timestamp
+  agent.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm
+
+the extracted event will contain the same body but the following headers will have been added
``timestamp=>1350611220000``
+
 Flume Properties
 ----------------
 


Mime
View raw message