flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1199. Add HTTP Post Source.
Date Fri, 19 Oct 2012 00:58:43 GMT
Updated Branches:
  refs/heads/flume-1.4 ba7a42ad1 -> 80fe9324f


FLUME-1199. Add HTTP Post Source.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/80fe9324
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/80fe9324
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/80fe9324

Branch: refs/heads/flume-1.4
Commit: 80fe9324f8a6b00a9afbaf4c658c3650469a0aca
Parents: ba7a42a
Author: Mike Percy <mpercy@apache.org>
Authored: Thu Oct 18 17:56:59 2012 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Thu Oct 18 17:58:19 2012 -0700

----------------------------------------------------------------------
 flume-ng-core/pom.xml                              |   16 +
 .../flume/source/http/HTTPBadRequestException.java |   43 ++
 .../org/apache/flume/source/http/HTTPSource.java   |  206 +++++++++
 .../http/HTTPSourceConfigurationConstants.java     |   34 ++
 .../flume/source/http/HTTPSourceHandler.java       |   45 ++
 .../org/apache/flume/source/http/JSONHandler.java  |  130 ++++++
 .../http/FlumeHttpServletRequestWrapper.java       |  322 +++++++++++++++
 .../apache/flume/source/http/TestHTTPSource.java   |  298 +++++++++++++
 .../apache/flume/source/http/TestJSONHandler.java  |  247 +++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   81 ++++
 .../java/org/apache/flume/event/JSONEvent.java     |   67 +++
 pom.xml                                            |   18 +
 12 files changed, 1507 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-core/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml
index 6f2341d..a03d59d 100644
--- a/flume-ng-core/pom.xml
+++ b/flume-ng-core/pom.xml
@@ -174,11 +174,27 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
     </dependency>
 
     <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPBadRequestException.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPBadRequestException.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPBadRequestException.java
new file mode 100644
index 0000000..32c92cd
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPBadRequestException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.http;
+
+import org.apache.flume.FlumeException;
+
+/**
+ *
+ * Exception thrown by an HTTP Handler if the request was not parsed correctly
+ * into an event because the request was not in the expected format.
+ *
+ */
+public class HTTPBadRequestException extends FlumeException {
+
+  private static final long serialVersionUID = -3540764742069390951L;
+
+  public HTTPBadRequestException(String msg) {
+    super(msg);
+  }
+
+  public HTTPBadRequestException(String msg, Throwable th) {
+    super(msg, th);
+  }
+
+  public HTTPBadRequestException(Throwable th) {
+    super(th);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
new file mode 100644
index 0000000..d4d818a
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.http;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A source which accepts Flume Events by HTTP POST and GET. GET should be used
+ * for experimentation only. HTTP requests are converted into flume events by a
+ * pluggable "handler" which must implement the
+ * {@linkplain HTTPSourceHandler} interface. This handler takes a
+ * {@linkplain HttpServletRequest} and returns a list of flume events.
+ *
+ * The source accepts the following parameters: <p> <tt>port</tt>: port to which
+ * the server should bind. Mandatory <p> <tt>handler</tt>: the class that
+ * deserializes a HttpServletRequest into a list of flume events. This class
+ * must implement HTTPSourceHandler. Default:
+ * {@linkplain JSONDeserializer}. <p> <tt>handler.*</tt> Any configuration
+ * to be passed to the handler. <p>
+ *
+ * All events deserialized from one Http request are committed to the channel in
+ * one transaction, thus allowing for increased efficiency on channels like the
+ * file channel. If the handler throws an exception this source will return
+ * a HTTP status of 400. If the channel is full, or the source is unable to
+ * append events to the channel, the source will return a HTTP 503 - Temporarily
+ * unavailable status.
+ *
+ * A JSON handler which converts JSON objects to Flume events is provided.
+ *
+ */
+public class HTTPSource extends AbstractSource implements
+        EventDrivenSource, Configurable {
+  /*
+   * There are 2 ways of doing this:
+   * a. Have a static server instance and use connectors in each source
+   *    which binds to the port defined for that source.
+   * b. Each source starts its own server instance, which binds to the source's
+   *    port.
+   *
+   * b is more efficient than a because Jetty does not allow binding a
+   * servlet to a connector. So each request will need to go through each
+   * each of the handlers/servlet till the correct one is found.
+   *
+   */
+
+  private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
+  private volatile Integer port;
+  private volatile Server srv;
+  private HTTPSourceHandler handler;
+
+  @Override
+  public void configure(Context context) {
+    try {
+      port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
+      checkPort();
+      String handlerClassName = context.getString(
+              HTTPSourceConfigurationConstants.CONFIG_HANDLER,
+              HTTPSourceConfigurationConstants.DEFAULT_HANDLER);
+      @SuppressWarnings("unchecked")
+      Class<? extends HTTPSourceHandler> clazz =
+              (Class<? extends HTTPSourceHandler>)
+              Class.forName(handlerClassName);
+      handler = clazz.getDeclaredConstructor().newInstance();
+      //ref: http://docs.codehaus.org/display/JETTY/Embedding+Jetty
+      //ref: http://jetty.codehaus.org/jetty/jetty-6/apidocs/org/mortbay/jetty/servlet/Context.html
+      Map<String, String> subProps =
+              context.getSubProperties(
+              HTTPSourceConfigurationConstants.CONFIG_HANDLER_PREFIX);
+      handler.configure(new Context(subProps));
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Error while configuring HTTPSource. Exception follows.", ex);
+      Throwables.propagate(ex);
+    } catch (ClassCastException ex) {
+      LOG.error("Deserializer is not an instance of HTTPSourceHandler."
+              + "Deserializer must implement HTTPSourceHandler.");
+      Throwables.propagate(ex);
+    } catch (Exception ex) {
+      LOG.error("Error configuring HTTPSource!", ex);
+      Throwables.propagate(ex);
+    }
+  }
+
+  @Override
+  public void start() {
+    checkPort();
+    Preconditions.checkState(srv == null,
+            "Running HTTP Server found in source: " + getName()
+            + " before I started one."
+            + "Will not attempt to start.");
+    srv = new Server(port);
+    try {
+      org.mortbay.jetty.servlet.Context root =
+              new org.mortbay.jetty.servlet.Context(
+              srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS);
+      root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/");
+      srv.start();
+      Preconditions.checkArgument(srv.getHandler().equals(root));
+    } catch (Exception ex) {
+      LOG.error("Error while starting HTTPSource. Exception follows.", ex);
+      Throwables.propagate(ex);
+    }
+    Preconditions.checkArgument(srv.isRunning());
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    try {
+      srv.stop();
+      srv.join();
+      srv = null;
+    } catch (Exception ex) {
+      LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
+    }
+  }
+
+  private void checkPort() {
+    Preconditions.checkNotNull(port, "HTTPSource requires a port number to be"
+            + "specified");
+  }
+
+  private class FlumeHTTPServlet extends HttpServlet {
+
+    private static final long serialVersionUID = 4891924863218790344L;
+
+    @Override
+    public void doPost(HttpServletRequest request, HttpServletResponse response)
+            throws IOException {
+      List<Event> events = new ArrayList<Event>(0); //create empty list
+      try {
+        events = handler.getEvents(request);
+      } catch (HTTPBadRequestException ex) {
+        LOG.warn("Received bad request from client. ", ex);
+        response.sendError(HttpServletResponse.SC_BAD_REQUEST,
+                "Bad request from client. "
+                + ex.getMessage());
+        return;
+      } catch (Exception ex) {
+        LOG.warn("Deserializer threw unexpected exception. ", ex);
+        response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+                "Deserializer threw unexpected exception. "
+                + ex.getMessage());
+        return;
+      }
+      try {
+        getChannelProcessor().processEventBatch(events);
+      } catch (ChannelException ex) {
+        LOG.warn("Error appending event to channel. "
+                + "Channel might be full. Consider increasing the channel"
+                + "capacity or make sure the sinks perform faster.", ex);
+        response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
+                "Error appending event to channel. Channel might be full."
+                + ex.getMessage());
+        return;
+      } catch (Exception ex) {
+        LOG.warn("Unexpected error appending event to channel. ", ex);
+        response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+                "Unexpected error while appending event to channel. "
+                + ex.getMessage());
+        return;
+      }
+      response.setCharacterEncoding(request.getCharacterEncoding());
+      response.setStatus(HttpServletResponse.SC_OK);
+      response.flushBuffer();
+    }
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response)
+            throws IOException {
+      doPost(request, response);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
new file mode 100644
index 0000000..55800f8
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.source.http;
+
+/**
+ *
+ */
+public class HTTPSourceConfigurationConstants {
+
+  public static final String CONFIG_PORT = "port";
+  public static final String CONFIG_HANDLER = "handler";
+  public static final String CONFIG_HANDLER_PREFIX =
+          CONFIG_HANDLER + ".";
+
+  public static final String DEFAULT_HANDLER =
+          "org.apache.flume.source.http.JSONHandler";
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java
new file mode 100644
index 0000000..726bf0c
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.http;
+
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+
+/**
+ *
+ */
+public interface HTTPSourceHandler extends Configurable {
+
+  /**
+   * Takes an {@linkplain HttpServletRequest} and returns a list of Flume
+   * Events. If this request cannot be parsed into Flume events based on the
+   * format this method will throw an exception. This method may also throw an
+   * exception if there is some sort of other error. <p>
+   *
+   * @param request The request to be parsed into Flume events.
+   * @return List of Flume events generated from the request.
+   * @throws HTTPBadRequestException If the was not parsed correctly into an
+   * event because the request was not in the expected format.
+   * @throws Exception If there was an unexpected error.
+   */
+  public List<Event> getEvents(HttpServletRequest request) throws
+          HTTPBadRequestException, Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java
new file mode 100644
index 0000000..9e1af7a
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.http;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.reflect.TypeToken;
+import java.io.BufferedReader;
+import java.lang.reflect.Type;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.JSONEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * JSONHandler for HTTPSource that accepts an array of events.
+ *
+ * This handler throws exception if the deserialization fails because of bad
+ * format or any other reason.
+ *
+ *
+ * Each event must be encoded as a map with two key-value pairs. <p> 1. headers
+ * - the key for this key-value pair is "headers". The value for this key is
+ * another map, which represent the event headers. These headers are inserted
+ * into the Flume event as is. <p> 2. body - The body is a string which
+ * represents the body of the event. The key for this key-value pair is "body".
+ * All key-value pairs are considered to be headers. An example: <p> [{"headers"
+ * : {"a":"b", "c":"d"},"body": "random_body"}, {"headers" : {"e": "f"},"body":
+ * "random_body2"}] <p> would be interpreted as the following two flume events:
+ * <p> * Event with body: "random_body" (in UTF-8/UTF-16/UTF-32 encoded bytes)
+ * and headers : (a:b, c:d) <p> *
+ * Event with body: "random_body2" (in UTF-8/UTF-16/UTF-32 encoded bytes) and
+ * headers : (e:f) <p>
+ *
+ * The charset of the body is read from the request and used. If no charset is
+ * set in the request, then the charset is assumed to be JSON's default - UTF-8.
+ * The JSON handler supports UTF-8, UTF-16 and UTF-32.
+ *
+ * To set the charset, the request must have content type specified as
+ * "application/json; charset=UTF-8" (replace UTF-8 with UTF-16 or UTF-32 as
+ * required).
+ *
+ * One way to create an event in the format expected by this handler, is to
+ * use {@linkplain JSONEvent} and use {@linkplain Gson} to create the JSON
+ * string using the
+ * {@linkplain Gson#toJson(java.lang.Object, java.lang.reflect.Type) }
+ * method. The type token to pass as the 2nd argument of this method
+ * for list of events can be created by: <p>
+ *
+ * Type type = new TypeToken<List<JSONEvent>>() {}.getType(); <p>
+ *
+ */
+
+public class JSONHandler implements HTTPSourceHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(JSONHandler.class);
+  private final Type listType =
+          new TypeToken<List<JSONEvent>>() {
+          }.getType();
+  private final Gson gson;
+
+  public JSONHandler() {
+    gson = new GsonBuilder().disableHtmlEscaping().create();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<Event> getEvents(HttpServletRequest request) throws Exception {
+    BufferedReader reader = request.getReader();
+    String charset = request.getCharacterEncoding();
+    //UTF-8 is default for JSON. If no charset is specified, UTF-8 is to
+    //be assumed.
+    if (charset == null) {
+      LOG.debug("Charset is null, default charset of UTF-8 will be used.");
+      charset = "UTF-8";
+    } else if (!(charset.equalsIgnoreCase("utf-8")
+            || charset.equalsIgnoreCase("utf-16")
+            || charset.equalsIgnoreCase("utf-32"))) {
+      LOG.error("Unsupported character set in request {}. "
+              + "JSON handler supports UTF-8, "
+              + "UTF-16 and UTF-32 only.", charset);
+      throw new UnsupportedCharsetException("JSON handler supports UTF-8, "
+              + "UTF-16 and UTF-32 only.");
+    }
+
+    /*
+     * Gson throws Exception if the data is not parseable to JSON.
+     * Need not catch it since the source will catch it and return error.
+     */
+    List<Event> eventList = new ArrayList<Event>(0);
+    try {
+      eventList = gson.fromJson(reader, listType);
+    } catch (JsonSyntaxException ex) {
+      throw new HTTPBadRequestException("Request has invalid JSON Syntax.", ex);
+    }
+
+    for (Event e : eventList) {
+      ((JSONEvent) e).setCharset(charset);
+    }
+    return eventList;
+  }
+
+  @Override
+  public void configure(Context context) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java
new file mode 100644
index 0000000..6b94b2e
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.http;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.Map;
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpSession;
+
+/**
+ *
+ */
+class FlumeHttpServletRequestWrapper implements HttpServletRequest {
+
+  private BufferedReader reader;
+
+  String charset;
+  public FlumeHttpServletRequestWrapper(String data, String charset) throws UnsupportedEncodingException {
+    reader = new BufferedReader(new InputStreamReader(
+            new ByteArrayInputStream(data.getBytes(charset)), charset));
+    this.charset = charset;
+  }
+
+  public FlumeHttpServletRequestWrapper(String data) throws UnsupportedEncodingException {
+    this(data, "UTF-8");
+  }
+
+  @Override
+  public String getAuthType() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Cookie[] getCookies() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public long getDateHeader(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getHeader(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Enumeration getHeaders(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Enumeration getHeaderNames() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public int getIntHeader(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getMethod() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getPathInfo() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getPathTranslated() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getContextPath() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getQueryString() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRemoteUser() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isUserInRole(String role) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Principal getUserPrincipal() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRequestedSessionId() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRequestURI() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public StringBuffer getRequestURL() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getServletPath() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public HttpSession getSession(boolean create) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public HttpSession getSession() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isRequestedSessionIdValid() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isRequestedSessionIdFromCookie() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isRequestedSessionIdFromURL() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isRequestedSessionIdFromUrl() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Object getAttribute(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Enumeration getAttributeNames() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getCharacterEncoding() {
+    return charset;
+  }
+
+  @Override
+  public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
+    this.charset = env;
+  }
+
+  @Override
+  public int getContentLength() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getContentType() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public ServletInputStream getInputStream() throws IOException {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getParameter(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Enumeration getParameterNames() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String[] getParameterValues(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Map getParameterMap() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getProtocol() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getScheme() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getServerName() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public int getServerPort() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public BufferedReader getReader() throws IOException {
+    return reader;
+  }
+
+  @Override
+  public String getRemoteAddr() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRemoteHost() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public void setAttribute(String name, Object o) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public void removeAttribute(String name) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Locale getLocale() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public Enumeration getLocales() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public boolean isSecure() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public RequestDispatcher getRequestDispatcher(String path) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getRealPath(String path) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public int getRemotePort() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getLocalName() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getLocalAddr() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public int getLocalPort() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
new file mode 100644
index 0000000..0a1b07d
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.http;
+
+import static org.fest.reflect.core.Reflection.*;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import javax.servlet.http.HttpServletResponse;
+import junit.framework.Assert;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.JSONEvent;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestHTTPSource {
+
+  private static HTTPSource source;
+  private static Channel channel;
+  private int selectedPort;
+  DefaultHttpClient httpClient;
+  HttpPost postRequest;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    source = new HTTPSource();
+    channel = new MemoryChannel();
+
+    Context ctx = new Context();
+    ctx.put("capacity", "100");
+    Configurables.configure(channel, ctx);
+
+    List<Channel> channels = new ArrayList<Channel>(1);
+    channels.add(channel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+
+    channel.start();
+    Context context = new Context();
+
+    context.put("port", String.valueOf(41404));
+
+    Configurables.configure(source, context);
+    source.start();
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    source.stop();
+    channel.stop();
+  }
+
+  @Before
+  public void setUp() {
+    httpClient = new DefaultHttpClient();
+    postRequest = new HttpPost("http://0.0.0.0:41404");
+  }
+
+  @Test
+  public void testSimple() throws IOException, InterruptedException {
+
+    StringEntity input = new StringEntity("[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"},"
+            + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]");
+    //if we do not set the content type to JSON, the client will use
+    //ISO-8859-1 as the charset. JSON standard does not support this.
+    input.setContentType("application/json");
+    postRequest.setEntity(input);
+
+    HttpResponse response = httpClient.execute(postRequest);
+
+    Assert.assertEquals(HttpServletResponse.SC_OK,
+            response.getStatusLine().getStatusCode());
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = channel.take();
+    Assert.assertNotNull(e);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8"));
+
+    e = channel.take();
+    Assert.assertNotNull(e);
+    Assert.assertEquals("f", e.getHeaders().get("e"));
+    Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8"));
+    tx.commit();
+    tx.close();
+  }
+
+  @Test
+  public void testSimpleUTF16() throws IOException, InterruptedException {
+
+    StringEntity input = new StringEntity("[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"},"
+            + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]", "UTF-16");
+    input.setContentType("application/json; charset=utf-16");
+    postRequest.setEntity(input);
+
+    HttpResponse response = httpClient.execute(postRequest);
+
+    Assert.assertEquals(HttpServletResponse.SC_OK,
+            response.getStatusLine().getStatusCode());
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = channel.take();
+    Assert.assertNotNull(e);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("random_body", new String(e.getBody(), "UTF-16"));
+
+    e = channel.take();
+    Assert.assertNotNull(e);
+    Assert.assertEquals("f", e.getHeaders().get("e"));
+    Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-16"));
+    tx.commit();
+    tx.close();
+  }
+
+  @Test
+  public void testInvalid() throws Exception {
+    StringEntity input = new StringEntity("[{\"a\": \"b\",[\"d\":\"e\"],\"body\": \"random_body\"},"
+            + "{\"e\": \"f\",\"body\": \"random_body2\"}]");
+    input.setContentType("application/json");
+    postRequest.setEntity(input);
+    HttpResponse response = httpClient.execute(postRequest);
+
+    Assert.assertEquals(HttpServletResponse.SC_BAD_REQUEST,
+            response.getStatusLine().getStatusCode());
+
+  }
+
+  @Test
+  public void testBigBatchDeserializarionUTF8() throws Exception {
+    testBatchWithVariousEncoding("UTF-8");
+  }
+
+  @Test
+  public void testBigBatchDeserializarionUTF16() throws Exception {
+    testBatchWithVariousEncoding("UTF-16");
+  }
+
+  @Test
+  public void testBigBatchDeserializarionUTF32() throws Exception {
+    testBatchWithVariousEncoding("UTF-32");
+  }
+  @Test
+  public void testSingleEvent() throws Exception {
+    StringEntity input = new StringEntity("[{\"headers\" : {\"a\": \"b\"},\"body\":"
+            + " \"random_body\"}]");
+    input.setContentType("application/json");
+    postRequest.setEntity(input);
+
+    httpClient.execute(postRequest);
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = channel.take();
+    Assert.assertNotNull(e);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("random_body", new String(e.getBody(),"UTF-8"));
+    tx.commit();
+    tx.close();
+  }
+
+  @Test
+  public void testFullChannel() throws Exception {
+    HttpResponse response = putWithEncoding("UTF-8", 150).response;
+    Assert.assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
+            response.getStatusLine().getStatusCode());
+  }
+
+  @Test
+  public void testFail() throws Exception {
+    HTTPSourceHandler handler = field("handler").ofType(HTTPSourceHandler.class)
+            .in(source).get();
+    //Cause an exception in the source - this is equivalent to any exception
+    //thrown by the handler since the handler is called inside a try-catch
+    field("handler").ofType(HTTPSourceHandler.class).in(source).set(null);
+    HttpResponse response = putWithEncoding("UTF-8", 1).response;
+    Assert.assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+            response.getStatusLine().getStatusCode());
+    //Set the original handler back so tests don't fail after this runs.
+    field("handler").ofType(HTTPSourceHandler.class).in(source).set(handler);
+  }
+
+  @Test
+  public void testHandlerThrowingException() throws Exception {
+    //This will cause the handler to throw an
+    //UnsupportedCharsetException.
+    HttpResponse response = putWithEncoding("ISO-8859-1", 150).response;
+    Assert.assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+            response.getStatusLine().getStatusCode());
+  }
+
+
+  private ResultWrapper putWithEncoding(String encoding, int n)
+          throws Exception{
+    Type listType = new TypeToken<List<JSONEvent>>() {
+    }.getType();
+    List<JSONEvent> events = Lists.newArrayList();
+    Random rand = new Random();
+    for (int i = 0; i < n; i++) {
+      Map<String, String> input = Maps.newHashMap();
+      for (int j = 0; j < 10; j++) {
+        input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i));
+      }
+      JSONEvent e = new JSONEvent();
+      e.setHeaders(input);
+      e.setBody(String.valueOf(rand.nextGaussian()).getBytes(encoding));
+      events.add(e);
+    }
+    Gson gson = new Gson();
+    String json = gson.toJson(events, listType);
+    StringEntity input = new StringEntity(json);
+    input.setContentType("application/json; charset=" + encoding);
+    postRequest.setEntity(input);
+    HttpResponse resp = httpClient.execute(postRequest);
+    return new ResultWrapper(resp, events);
+  }
+
+  private void takeWithEncoding(String encoding, int n, List<JSONEvent> events)
+          throws Exception{
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = null;
+    int i = 0;
+    while (true) {
+      e = channel.take();
+      if (e == null) {
+        break;
+      }
+      Event current = events.get(i++);
+      Assert.assertEquals(new String(current.getBody(), encoding),
+              new String(e.getBody(), encoding));
+      Assert.assertEquals(current.getHeaders(), e.getHeaders());
+    }
+    Assert.assertEquals(n, events.size());
+    tx.commit();
+    tx.close();
+  }
+
+
+  private void testBatchWithVariousEncoding(String encoding) throws Exception {
+    testBatchWithVariousEncoding(encoding, 50);
+  }
+  private void testBatchWithVariousEncoding(String encoding, int n)
+          throws Exception {
+    List<JSONEvent> events = putWithEncoding(encoding, n).events;
+    takeWithEncoding(encoding, n, events);
+  }
+
+  private class ResultWrapper {
+    public final HttpResponse response;
+    public final List<JSONEvent> events;
+    public ResultWrapper(HttpResponse resp, List<JSONEvent> events){
+      this.response = resp;
+      this.events = events;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-core/src/test/java/org/apache/flume/source/http/TestJSONHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestJSONHandler.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestJSONHandler.java
new file mode 100644
index 0000000..e8f256b
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestJSONHandler.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.http;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import java.lang.reflect.Type;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import javax.servlet.http.HttpServletRequest;
+import junit.framework.Assert;
+import org.apache.flume.Event;
+import org.apache.flume.event.JSONEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestJSONHandler {
+
+  HTTPSourceHandler handler;
+
+  @Before
+  public void setUp() {
+    handler = new JSONHandler();
+  }
+
+  @Test
+  public void testMultipleEvents() throws Exception {
+    String json = "[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"},"
+            + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
+    List<Event> deserialized = handler.getEvents(req);
+    Event e = deserialized.get(0);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("random_body", new String(e.getBody(),"UTF-8"));
+    e = deserialized.get(1);
+    Assert.assertEquals("f", e.getHeaders().get("e"));
+    Assert.assertEquals("random_body2", new String(e.getBody(),"UTF-8"));
+
+  }
+
+  @Test
+  public void testMultipleEventsUTF16() throws Exception {
+    String json = "[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"},"
+            + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, "UTF-16");
+    List<Event> deserialized = handler.getEvents(req);
+    Event e = deserialized.get(0);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("random_body", new String(e.getBody(), "UTF-16"));
+    e = deserialized.get(1);
+    Assert.assertEquals("f", e.getHeaders().get("e"));
+    Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-16"));
+
+  }
+
+  @Test
+  public void testMultipleEventsUTF32() throws Exception {
+    String json = "[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"},"
+            + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, "UTF-32");
+    List<Event> deserialized = handler.getEvents(req);
+    Event e = deserialized.get(0);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("random_body", new String(e.getBody(), "UTF-32"));
+    e = deserialized.get(1);
+    Assert.assertEquals("f", e.getHeaders().get("e"));
+    Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-32"));
+  }
+
+  @Test
+  public void testMultipleEventsUTF8() throws Exception {
+    String json = "[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"},"
+            + "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, "UTF-8");
+    List<Event> deserialized = handler.getEvents(req);
+    Event e = deserialized.get(0);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8"));
+    e = deserialized.get(1);
+    Assert.assertEquals("f", e.getHeaders().get("e"));
+    Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8"));
+
+  }
+
+  @Test
+  public void testEscapedJSON() throws Exception {
+    //JSON allows escaping double quotes to add it in the data.
+    String json = "[{\"headers\":{\"a\": \"b\"}},"
+            + "{\"headers\":{\"e\": \"f\"},\"body\": \"rand\\\"om_body2\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
+    List<Event> deserialized = handler.getEvents(req);
+    Event e = deserialized.get(0);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertTrue(e.getBody() == null);
+    e = deserialized.get(1);
+    Assert.assertEquals("f", e.getHeaders().get("e"));
+    Assert.assertEquals("rand\"om_body2", new String(e.getBody(),"UTF-8"));
+  }
+
+  @Test
+  public void testNoBody() throws Exception {
+    String json = "[{\"headers\" : {\"a\": \"b\"}},"
+            + "{\"headers\" : {\"e\": \"f\"},\"body\": \"random_body2\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
+    List<Event> deserialized = handler.getEvents(req);
+    Event e = deserialized.get(0);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertTrue(e.getBody() == null);
+    e = deserialized.get(1);
+    Assert.assertEquals("f", e.getHeaders().get("e"));
+    Assert.assertEquals("random_body2", new String(e.getBody(),"UTF-8"));
+  }
+
+  @Test
+  public void testSingleHTMLEvent() throws Exception {
+    String json = "[{\"headers\": {\"a\": \"b\"},"
+            + "\"body\": \"<html><body>test</body></html>\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
+    List<Event> deserialized = handler.getEvents(req);
+    Event e = deserialized.get(0);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("<html><body>test</body></html>",
+            new String(e.getBody(),"UTF-8"));
+  }
+
+  @Test
+  public void testSingleEvent() throws Exception {
+    String json = "[{\"headers\" : {\"a\": \"b\"},\"body\": \"random_body\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
+    List<Event> deserialized = handler.getEvents(req);
+    Event e = deserialized.get(0);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("random_body", new String(e.getBody(),"UTF-8"));
+  }
+
+  @Test(expected = HTTPBadRequestException.class)
+  public void testBadEvent() throws Exception {
+    String json = "{[\"a\": \"b\"],\"body\": \"random_body\"}";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
+    handler.getEvents(req);
+    Assert.fail();
+  }
+
+  @Test(expected = UnsupportedCharsetException.class)
+  public void testError() throws Exception {
+    String json = "[{\"headers\" : {\"a\": \"b\"},\"body\": \"random_body\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json, "ISO-8859-1");
+    handler.getEvents(req);
+    Assert.fail();
+  }
+
+  @Test
+  public void testSingleEventInArray() throws Exception {
+    String json = "[{\"headers\": {\"a\": \"b\"},\"body\": \"random_body\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
+    List<Event> deserialized = handler.getEvents(req);
+    Event e = deserialized.get(0);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("random_body", new String(e.getBody(),"UTF-8"));
+  }
+
+  @Test
+  public void testMultipleLargeEvents() throws Exception {
+    String json = "[{\"headers\" : {\"a\": \"b\", \"a2\": \"b2\","
+            + "\"a3\": \"b3\",\"a4\": \"b4\"},\"body\": \"random_body\"},"
+            + "{\"headers\" :{\"e\": \"f\",\"e2\": \"f2\","
+            + "\"e3\": \"f3\",\"e4\": \"f4\",\"e5\": \"f5\"},"
+            + "\"body\": \"random_body2\"},"
+            + "{\"headers\" :{\"q1\": \"b\",\"q2\": \"b2\",\"q3\": \"b3\",\"q4\": \"b4\"},"
+            + "\"body\": \"random_bodyq\"}]";
+    HttpServletRequest req = new FlumeHttpServletRequestWrapper(json);
+    List<Event> deserialized = handler.getEvents(req);
+    Event e = deserialized.get(0);
+    Assert.assertNotNull(e);
+    Assert.assertEquals("b", e.getHeaders().get("a"));
+    Assert.assertEquals("b2", e.getHeaders().get("a2"));
+    Assert.assertEquals("b3", e.getHeaders().get("a3"));
+    Assert.assertEquals("b4", e.getHeaders().get("a4"));
+    Assert.assertEquals("random_body", new String(e.getBody(),"UTF-8"));
+    e = deserialized.get(1);
+    Assert.assertNotNull(e);
+    Assert.assertEquals("f", e.getHeaders().get("e"));
+    Assert.assertEquals("f2", e.getHeaders().get("e2"));
+    Assert.assertEquals("f3", e.getHeaders().get("e3"));
+    Assert.assertEquals("f4", e.getHeaders().get("e4"));
+    Assert.assertEquals("f5", e.getHeaders().get("e5"));
+    Assert.assertEquals("random_body2", new String(e.getBody(),"UTF-8"));
+    e = deserialized.get(2);
+    Assert.assertNotNull(e);
+    Assert.assertEquals("b", e.getHeaders().get("q1"));
+    Assert.assertEquals("b2", e.getHeaders().get("q2"));
+    Assert.assertEquals("b3", e.getHeaders().get("q3"));
+    Assert.assertEquals("b4", e.getHeaders().get("q4"));
+    Assert.assertEquals("random_bodyq", new String(e.getBody(),"UTF-8"));
+
+  }
+
+  @Test
+  public void testDeserializarion() throws Exception {
+    Type listType = new TypeToken<List<JSONEvent>>() {
+    }.getType();
+    List<JSONEvent> events = Lists.newArrayList();
+    Random rand = new Random();
+    for (int i = 1; i < 10; i++) {
+      Map<String, String> input = Maps.newHashMap();
+      for (int j = 1; j < 10; j++) {
+        input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i));
+      }
+      JSONEvent e = new JSONEvent();
+      e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8"));
+      e.setHeaders(input);
+      events.add(e);
+    }
+    Gson gson = new Gson();
+    List<Event> deserialized = handler.getEvents(
+            new FlumeHttpServletRequestWrapper(gson.toJson(events, listType)));
+    int i = 0;
+    for (Event e : deserialized) {
+      Event current = events.get(i++);
+      Assert.assertEquals(new String(current.getBody(),"UTF-8"),
+              new String(e.getBody(),"UTF-8"));
+      Assert.assertEquals(current.getHeaders(), e.getHeaders());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 86bbd1a..29ead84 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -791,6 +791,87 @@ For example, a syslog UDP source for agent named **agent_foo**:
   agent_foo.sources.syslogsource-1.host = localhost
   agent_foo.sources.syslogsource-1.channels = memoryChannel-1
 
+HTTP Source
+~~~~~~~~~~~
+A source which accepts Flume Events by HTTP POST and GET. GET should be used
+for experimentation only. HTTP requests are converted into flume events by
+a pluggable "handler" which must implement the HTTPSourceHandler interface.
+This handler takes a HttpServletRequest and returns a list of
+flume events. All events handler from one Http request are committed to the channel
+in one transaction, thus allowing for increased efficiency on channels like
+the file channel. If the handler throws an exception this source will
+return a HTTP status of 400. If the channel is full, or the source is unable to
+append events to the channel, the source will return a HTTP 503 - Temporarily
+unavailable status.
+
+All events sent in one post request are considered to be one batch and
+inserted into the channel in one transaction.
+
+==============  ===========================================  ====================================================================
+Property Name   Default                                      Description
+==============  ===========================================  ====================================================================
+**type**                                                     The FQCN of this class:  ``org.apache.flume.source.http.HTTPSource``
+**port**        --                                           The port the source should bind to.
+handler         ``org.apache.flume.http.JSONHandler``        The FQCN of the handler class.
+handler.*       --                                           Config parameters for the handler
+selector.type   replicating                                  replicating or multiplexing
+selector.*                                                   Depends on the selector.type value
+interceptors    --                                           Space separated list of interceptors
+interceptors.*
+=================================================================================================================================
+
+For example, a http source for agent named **agent_foo**:
+
+.. code-block:: properties
+
+  agent_foo.sources = httpsource-1
+  agent_foo.channels = memoryChannel-1
+  agent_foo.sources.httpsource-1.type = org.apache.flume.source.http.HTTPSource
+  agent_foo.sources.httpsource-1.port = 5140
+  agent_foo.sources.httpsource-1.channels = memoryChannel-1
+  agent_foo.sources.httpsource-1.handler = org.example.rest.RestHandler
+  agent_foo.sources.httpsource-1.handler.nickname = random props
+
+JSONHandler
+'''''''''''
+A handler is provided out of the box which can handle events represented in
+JSON format, and supports UTF-8, UTF-16 and UTF-32 character sets. The handler
+accepts an array of events (even if there is only one event, the event has to be
+sent in an array) and converts them to a Flume event based on the
+encoding specified in the request. If no encoding is specified, UTF-8 is assumed.
+The JSON handler supports UTF-8, UTF-16 and UTF-32.
+Events are represented as follows.
+
+.. code-block:: javascript
+
+  [{
+    "headers" : {
+               "timestamp" : "434324343",
+               "host" : "random_host.example.com"
+               },
+    "body" : "random_body"
+    },
+    {
+    "headers" : {
+               "namenode" : "namenode.example.com",
+               "datanode" : "random_datanode.example.com"
+               },
+    "body" : "really_random_body"
+    }]
+
+To set the charset, the request must have content type specified as
+``application/json; charset=UTF-8`` (replace UTF-8 with UTF-16 or UTF-32 as
+required).
+
+One way to create an event in the format expected by this handler, is to
+use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON
+string using the Gson#fromJson(Object, Type)
+method. The type token to pass as the 2nd argument of this method
+for list of events can be created by:
+
+.. code-block:: java
+
+  Type type = new TypeToken<List<JSONEvent>>() {}.getType();
 
 Legacy Sources
 ~~~~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java b/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java
new file mode 100644
index 0000000..e62f689
--- /dev/null
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/event/JSONEvent.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.event;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import org.apache.flume.Event;
+
+/**
+ *
+ */
+public class JSONEvent implements Event{
+  private Map<String, String> headers;
+  private String body;
+  private transient String charset = "UTF-8";
+
+  @Override
+  public Map<String, String> getHeaders() {
+    return headers;
+  }
+
+  @Override
+  public void setHeaders(Map<String, String> headers) {
+    this.headers = headers;
+  }
+
+  @Override
+  public byte[] getBody() {
+    if(body != null) {
+      try {
+        return body.getBytes(charset);
+      } catch (UnsupportedEncodingException ex) {
+        //Should never happen
+        return null;
+      }
+    } else {
+      return null;
+    }
+
+  }
+
+  @Override
+  public void setBody(byte[] body) {
+    this.body = new String(body);
+  }
+
+  public void setCharset(String charset) {
+    this.charset = charset;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/80fe9324/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6d0f6c5..fa0a9dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -703,6 +703,24 @@ limitations under the License.
         <version>2.5-20110124</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.mortbay.jetty</groupId>
+        <artifactId>jetty-util</artifactId>
+        <version>6.1.26</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.mortbay.jetty</groupId>
+        <artifactId>jetty</artifactId>
+        <version>6.1.26</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>httpclient</artifactId>
+        <version>4.2.1</version>
+      </dependency>
+
       <!--  Gson: Java to Json conversion -->
       <dependency>
         <groupId>com.google.code.gson</groupId>


Mime
View raw message