flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From juha...@apache.org
Subject svn commit: r1351641 - in /incubator/flume/trunk/flume-ng-core/src: main/java/org/apache/flume/interceptor/HostInterceptor.java main/java/org/apache/flume/interceptor/InterceptorType.java test/java/org/apache/flume/interceptor/TestHostInterceptor.java
Date Tue, 19 Jun 2012 10:23:28 GMT
Author: juhanic
Date: Tue Jun 19 10:23:28 2012
New Revision: 1351641

URL: http://svn.apache.org/viewvc?rev=1351641&view=rev
Log:
Flume-1284 Need host interceptor for hdfs bucket path escape sequence

(Will McQueen via Juhani Connolly)

Added:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestHostInterceptor.java
Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java

Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java?rev=1351641&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java
(added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/HostInterceptor.java
Tue Jun 19 10:23:28 2012
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.interceptor;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flume.interceptor.HostInterceptor.Constants.*;
+
+/**
+ * Simple Interceptor class that sets the host name or IP on all events
+ * that are intercepted.<p>
+ * The host header is named <code>host</code> and its format is either the FQDN
+ * or IP of the host on which this interceptor is run.
+ *
+ *
+ * Properties:<p>
+ *
+ *   preserveExisting: Whether to preserve an existing value for 'host'
+ *                     (default is false)<p>
+ *
+ *   useIP: Whether to use IP address or fully-qualified hostname for 'host'
+ *          header value (default is true)<p>
+ *
+ *
+ * Sample config:<p>
+ *
+ * <code>
+ *   agent.sources.r1.channels = c1<p>
+ *   agent.sources.r1.type = SEQ<p>
+ *   agent.sources.r1.interceptors = i1<p>
+ *   agent.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder<p>
+ *   agent.sources.r1.interceptors.i1.preserveExisting = true<p>
+ *   agent.sources.r1.interceptors.i1.useIP = false<p>
+ * </code>
+ *
+ */
+public class HostInterceptor implements Interceptor {
+
+  private static final Logger logger = LoggerFactory
+          .getLogger(HostInterceptor.class);
+
+  private final boolean preserveExisting;
+  private final boolean useIP;
+
+  /**
+   * Only {@link HostInterceptor.Builder} can build me
+   */
+  private HostInterceptor(boolean preserveExisting, boolean useIP) {
+        this.preserveExisting = preserveExisting;
+        this.useIP = useIP;
+  }
+
+  @Override
+  public void initialize() {
+    // no-op
+  }
+
+  /**
+   * Modifies events in-place.
+   */
+  @Override
+  public Event intercept(Event event) {
+    Map<String, String> headers = event.getHeaders();
+
+    if (preserveExisting && headers.containsKey(HOST)) {
+        return event;
+    }
+
+    InetAddress addr;
+    try {
+        addr = InetAddress.getLocalHost();
+    } catch (UnknownHostException e) {
+        logger.warn("Could not get local host address. Exception follows.", e);
+        return event;
+    }
+
+    String host;
+    if (useIP) {
+        host = addr.getHostAddress();
+    } else {
+        host = addr.getCanonicalHostName();
+    }
+
+    headers.put(HOST, host);
+
+    return event;
+  }
+
+  /**
+   * Delegates to {@link #intercept(Event)} in a loop.
+   * @param events
+   * @return
+   */
+  @Override
+  public List<Event> intercept(List<Event> events) {
+    for (Event event : events) {
+      intercept(event);
+    }
+    return events;
+  }
+
+  @Override
+  public void close() {
+    // no-op
+  }
+
+  /**
+   * Builder which builds new instances of the HostInterceptor.
+   */
+  public static class Builder implements Interceptor.Builder {
+
+    private boolean preserveExisting = PRESERVE_DFLT;
+    private boolean useIP = USE_IP_DFLT;
+
+    @Override
+    public Interceptor build() {
+      return new HostInterceptor(preserveExisting, useIP);
+    }
+
+    @Override
+    public void configure(Context context) {
+      preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
+      useIP = context.getBoolean(USE_IP, USE_IP_DFLT);
+    }
+
+  }
+
+  public static class Constants {
+    public static String HOST = "host";
+
+    public static String PRESERVE = "preserveExisting";
+    public static boolean PRESERVE_DFLT = false;
+
+    public static String USE_IP = "useIP";
+    public static boolean USE_IP_DFLT = true;
+  }
+
+}

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java?rev=1351641&r1=1351640&r2=1351641&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
(original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
Tue Jun 19 10:23:28 2012
@@ -20,7 +20,9 @@ package org.apache.flume.interceptor;
 
 public enum InterceptorType {
 
-  TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class);
+  TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class),
+  HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class),
+  ;
 
   private final Class<? extends Interceptor.Builder> builderClass;
 

Added: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestHostInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestHostInterceptor.java?rev=1351641&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestHostInterceptor.java
(added)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestHostInterceptor.java
Tue Jun 19 10:23:28 2012
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.interceptor;
+
+import java.net.InetAddress;
+
+import com.google.common.base.Charsets;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.interceptor.HostInterceptor.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHostInterceptor {
+
+  /**
+   * Ensure that the "host" header gets set (to something)
+   */
+  @Test
+  public void testBasic() throws Exception {
+    Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+            InterceptorType.HOST.toString());
+    Interceptor interceptor = builder.build();
+
+    Event eventBeforeIntercept = EventBuilder.withBody("test event",
+            Charsets.UTF_8);
+    Assert.assertNull(eventBeforeIntercept.getHeaders().get(Constants.HOST));
+
+    Event eventAfterIntercept = interceptor.intercept(eventBeforeIntercept);
+    String actualHost = eventAfterIntercept.getHeaders().get(Constants.HOST);
+
+    Assert.assertNotNull(actualHost);
+  }
+
+  /**
+   * Ensure host is NOT overwritten when preserveExisting=true.
+   */
+  @Test
+  public void testPreserve() throws Exception {
+    Context ctx = new Context();
+    ctx.put("preserveExisting", "true");
+
+    Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+            InterceptorType.HOST.toString());
+    builder.configure(ctx);
+    Interceptor interceptor = builder.build();
+
+    final String ORIGINAL_HOST = "originalhost";
+    Event eventBeforeIntercept = EventBuilder.withBody("test event",
+            Charsets.UTF_8);
+    eventBeforeIntercept.getHeaders().put(Constants.HOST, ORIGINAL_HOST);
+    Assert.assertEquals(ORIGINAL_HOST,
+            eventBeforeIntercept.getHeaders().get(Constants.HOST));
+
+    String expectedHost = ORIGINAL_HOST;
+    Event eventAfterIntercept = interceptor.intercept(eventBeforeIntercept);
+    String actualHost = eventAfterIntercept.getHeaders().get(Constants.HOST);
+
+    Assert.assertNotNull(actualHost);
+    Assert.assertEquals(expectedHost, actualHost);
+  }
+
+  /**
+   * Ensure host IS overwritten when preserveExisting=false.
+   */
+  @Test
+  public void testClobber() throws Exception {
+    Context ctx = new Context();
+    ctx.put("preserveExisting", "false"); // default behavior
+
+    Interceptor.Builder builder = InterceptorBuilderFactory
+            .newInstance(InterceptorType.HOST.toString());
+    builder.configure(ctx);
+    Interceptor interceptor = builder.build();
+
+    final String ORIGINAL_HOST = "originalhost";
+    Event eventBeforeIntercept = EventBuilder.withBody("test event",
+            Charsets.UTF_8);
+    eventBeforeIntercept.getHeaders().put(Constants.HOST, ORIGINAL_HOST);
+    Assert.assertEquals(ORIGINAL_HOST, eventBeforeIntercept.getHeaders()
+            .get(Constants.HOST));
+
+    String expectedHost = InetAddress.getLocalHost().getHostAddress();
+    Event eventAfterIntercept = interceptor.intercept(eventBeforeIntercept);
+    String actualHost = eventAfterIntercept.getHeaders().get(Constants.HOST);
+
+    Assert.assertNotNull(actualHost);
+    Assert.assertEquals(expectedHost, actualHost);
+  }
+
+  /**
+   * Ensure host IP is used by default instead of host name.
+   */
+  @Test
+  public void testUseIP() throws Exception {
+    Context ctx = new Context();
+    ctx.put("useIP", "true"); // default behavior
+
+    Interceptor.Builder builder = InterceptorBuilderFactory
+            .newInstance(InterceptorType.HOST.toString());
+    builder.configure(ctx);
+    Interceptor interceptor = builder.build();
+
+    final String ORIGINAL_HOST = "originalhost";
+    Event eventBeforeIntercept = EventBuilder.withBody("test event",
+            Charsets.UTF_8);
+    eventBeforeIntercept.getHeaders().put(Constants.HOST, ORIGINAL_HOST);
+    Assert.assertEquals(ORIGINAL_HOST, eventBeforeIntercept.getHeaders()
+            .get(Constants.HOST));
+
+    String expectedHost = InetAddress.getLocalHost().getHostAddress();
+    Event eventAfterIntercept = interceptor.intercept(eventBeforeIntercept);
+    String actualHost = eventAfterIntercept.getHeaders().get(Constants.HOST);
+
+    Assert.assertNotNull(actualHost);
+    Assert.assertEquals(expectedHost, actualHost);
+  }
+
+  /**
+   * Ensure host name can be used instead of host IP.
+   */
+  @Test
+  public void testUseHostname() throws Exception {
+    Context ctx = new Context();
+    ctx.put("useIP", "false");
+
+    Interceptor.Builder builder = InterceptorBuilderFactory
+            .newInstance(InterceptorType.HOST.toString());
+    builder.configure(ctx);
+    Interceptor interceptor = builder.build();
+
+    final String ORIGINAL_HOST = "originalhost";
+    Event eventBeforeIntercept = EventBuilder.withBody("test event",
+            Charsets.UTF_8);
+    eventBeforeIntercept.getHeaders().put(Constants.HOST, ORIGINAL_HOST);
+    Assert.assertEquals(ORIGINAL_HOST, eventBeforeIntercept.getHeaders()
+            .get(Constants.HOST));
+
+    String expectedHost = InetAddress.getLocalHost().getCanonicalHostName();
+    Event eventAfterIntercept = interceptor.intercept(eventBeforeIntercept);
+    String actualHost = eventAfterIntercept.getHeaders().get(Constants.HOST);
+
+    Assert.assertNotNull(actualHost);
+    Assert.assertEquals(expectedHost, actualHost);
+  }
+
+}



Mime
View raw message