flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject flume git commit: FLUME-2718: HTTP Source to support generic Stream Handler
Date Fri, 15 Jan 2016 16:26:00 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.7 069558528 -> d6b4053e7


FLUME-2718: HTTP Source to support generic Stream Handler

(Hari via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d6b4053e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d6b4053e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d6b4053e

Branch: refs/heads/flume-1.7
Commit: d6b4053e76024d1f36b43f5c458783ba0849616b
Parents: 0695585
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Fri Jan 15 08:23:14 2016 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Fri Jan 15 08:25:53 2016 -0800

----------------------------------------------------------------------
 .../apache/flume/source/http/BLOBHandler.java   | 101 +++++++++++
 .../flume/source/http/TestBLOBHandler.java      | 170 +++++++++++++++++++
 2 files changed, 271 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d6b4053e/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
new file mode 100644
index 0000000..a816363
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/BLOBHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.http;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ * BLOBHandler for HTTPSource that accepts any binary stream of data as event.
+ *
+ */
+public class BLOBHandler implements HTTPSourceHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BLOBHandler.class);
+
+  private String commaSeparatedHeaders;
+
+  private String[] mandatoryHeaders;
+
+  public static final String MANDATORY_PARAMETERS = "mandatoryParameters";
+
+  public static final String DEFAULT_MANDATORY_PARAMETERS = "";
+
+  public static final String PARAMETER_SEPARATOR = ",";
+
+  /**
+   * {@inheritDoc}
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public List<Event> getEvents(HttpServletRequest request) throws Exception {
+    Map<String, String> headers = new HashMap<String, String>();
+
+    InputStream inputStream = request.getInputStream();
+
+    Map<String, String[]> parameters = request.getParameterMap();
+    for (String parameter : parameters.keySet()) {
+      String value = parameters.get(parameter)[0];
+      LOG.debug("Setting Header [Key, Value] as [{},{}] ",parameter, value);
+      headers.put(parameter, value);
+    }
+
+    for (String header : mandatoryHeaders) {
+      Preconditions.checkArgument(headers.containsKey(header),
+          "Please specify " + header + " parameter in the request.");
+    }
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    try{
+      IOUtils.copy(inputStream, outputStream);
+      LOG.debug("Building an Event with stream of size -- {}", outputStream.size());
+      Event event = EventBuilder.withBody(outputStream.toByteArray(), headers);
+      event.setHeaders(headers);
+      List<Event> eventList = new ArrayList<Event>();
+      eventList.add(event);
+      return eventList;
+    }
+    finally {
+      outputStream.close();
+      inputStream.close();
+    }
+  }
+
+  @Override
+  public void configure(Context context) {
+    this.commaSeparatedHeaders = context.getString(MANDATORY_PARAMETERS, DEFAULT_MANDATORY_PARAMETERS);
+    this.mandatoryHeaders = commaSeparatedHeaders.split(PARAMETER_SEPARATOR);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/d6b4053e/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java
b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java
new file mode 100644
index 0000000..f770d51
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.http;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestBLOBHandler {
+
+  HTTPSourceHandler handler;
+
+  @Before
+  public void setUp() {
+    handler = new BLOBHandler();
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testCSVData() throws Exception {
+    Map requestParameterMap = new HashMap();
+    requestParameterMap.put("param1", new String[] { "value1" });
+    requestParameterMap.put("param2", new String[] { "value2" });
+
+    HttpServletRequest req = mock(HttpServletRequest.class);
+    final String csvData = "a,b,c";
+
+    ServletInputStream servletInputStream = new DelegatingServletInputStream(
+        new ByteArrayInputStream(csvData.getBytes()));
+
+    when(req.getInputStream()).thenReturn(servletInputStream);
+    when(req.getParameterMap()).thenReturn(requestParameterMap);
+
+    Context context = mock(Context.class);
+    when(
+        context.getString(BLOBHandler.MANDATORY_PARAMETERS,
+            BLOBHandler.DEFAULT_MANDATORY_PARAMETERS)).thenReturn(
+        "param1,param2");
+
+    handler.configure(context);
+    List<Event> deserialized = handler.getEvents(req);
+    assertEquals(1, deserialized.size());
+    Event e = deserialized.get(0);
+
+    assertEquals(new String(e.getBody()), csvData);
+    assertEquals(e.getHeaders().get("param1"), "value1");
+    assertEquals(e.getHeaders().get("param2"), "value2");
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testTabData() throws Exception {
+    Map requestParameterMap = new HashMap();
+    requestParameterMap.put("param1", new String[] { "value1" });
+
+    HttpServletRequest req = mock(HttpServletRequest.class);
+    final String tabData = "a\tb\tc";
+
+    ServletInputStream servletInputStream = new DelegatingServletInputStream(
+        new ByteArrayInputStream(tabData.getBytes()));
+
+    when(req.getInputStream()).thenReturn(servletInputStream);
+    when(req.getParameterMap()).thenReturn(requestParameterMap);
+
+    Context context = mock(Context.class);
+    when(
+        context.getString(BLOBHandler.MANDATORY_PARAMETERS,
+            BLOBHandler.DEFAULT_MANDATORY_PARAMETERS)).thenReturn("param1");
+
+    handler.configure(context);
+
+    List<Event> deserialized = handler.getEvents(req);
+    assertEquals(1, deserialized.size());
+    Event e = deserialized.get(0);
+
+    assertEquals(new String(e.getBody()), tabData);
+    assertEquals(e.getHeaders().get("param1"), "value1");
+  }
+
+  @SuppressWarnings({ "rawtypes" })
+  @Test(expected = IllegalArgumentException.class)
+  public void testMissingParameters() throws Exception {
+    Map requestParameterMap = new HashMap();
+
+    HttpServletRequest req = mock(HttpServletRequest.class);
+    final String tabData = "a\tb\tc";
+
+    ServletInputStream servletInputStream = new DelegatingServletInputStream(
+        new ByteArrayInputStream(tabData.getBytes()));
+
+    when(req.getInputStream()).thenReturn(servletInputStream);
+    when(req.getParameterMap()).thenReturn(requestParameterMap);
+
+    Context context = mock(Context.class);
+    when(
+        context.getString(BLOBHandler.MANDATORY_PARAMETERS,
+            BLOBHandler.DEFAULT_MANDATORY_PARAMETERS)).thenReturn("param1");
+
+    handler.configure(context);
+
+    handler.getEvents(req);
+
+  }
+
+  class DelegatingServletInputStream extends ServletInputStream {
+
+    private final InputStream sourceStream;
+
+    /**
+     * Create a DelegatingServletInputStream for the given source stream.
+     *
+     * @param sourceStream
+     *          the source stream (never <code>null</code>)
+     */
+    public DelegatingServletInputStream(InputStream sourceStream) {
+      this.sourceStream = sourceStream;
+    }
+
+    /**
+     * Return the underlying source stream (never <code>null</code>).
+     */
+    public final InputStream getSourceStream() {
+      return this.sourceStream;
+    }
+
+    public int read() throws IOException {
+      return this.sourceStream.read();
+    }
+
+    public void close() throws IOException {
+      super.close();
+      this.sourceStream.close();
+    }
+
+  }
+
+}


Mime
View raw message