Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E0837200D27 for ; Wed, 25 Oct 2017 21:36:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DEC5A160BDA; Wed, 25 Oct 2017 19:36:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B1DB81609CE for ; Wed, 25 Oct 2017 21:36:46 +0200 (CEST) Received: (qmail 73659 invoked by uid 500); 25 Oct 2017 19:36:45 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 73650 invoked by uid 99); 25 Oct 2017 19:36:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Oct 2017 19:36:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B1E4FDF9FE; Wed, 25 Oct 2017 19:36:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: denes@apache.org To: commits@flume.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flume git commit: FLUME-2698. Upgrade Jetty Version Date: Wed, 25 Oct 2017 19:36:45 +0000 (UTC) archived-at: Wed, 25 Oct 2017 19:36:49 -0000 Repository: flume Updated Branches: refs/heads/trunk ed288acba -> 580f78134 FLUME-2698. Upgrade Jetty Version Update Jetty version to 9.4.6. Beside the version upgrade this patch exposes the new Jetty's configuration variables on the HTTPSource to provide users the possibility of fine-tuning the HTTPSource. This closes #158 Reviewers: Ferenc Szabo, Mike Percy, Miklos Csanady, Denes Arvay (Tristan Stevens via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/580f7813 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/580f7813 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/580f7813 Branch: refs/heads/trunk Commit: 580f781341b42672f234af069b9190ff1cc00dca Parents: ed288ac Author: Tristan Stevens Authored: Wed Oct 25 10:56:42 2017 -0700 Committer: Denes Arvay Committed: Wed Oct 25 12:28:12 2017 -0700 ---------------------------------------------------------------------- flume-ng-core/pom.xml | 21 +- .../instrumentation/http/HTTPMetricsServer.java | 24 +- .../apache/flume/source/http/HTTPSource.java | 118 ++++---- .../http/HTTPSourceConfigurationConstants.java | 1 + .../flume/tools/FlumeBeanConfigurator.java | 136 +++++++++ .../flume/tools/HTTPServerConstraintUtil.java | 22 +- .../http/TestHTTPMetricsServer.java | 1 + .../http/FlumeHttpServletRequestWrapper.java | 101 ++++++- .../flume/source/http/TestBLOBHandler.java | 13 + .../flume/source/http/TestHTTPSource.java | 222 +++++++++++---- .../flume/tools/TestFlumeConfigurator.java | 284 +++++++++++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 56 ++-- .../flume-avro-source/pom.xml | 5 - .../flume-thrift-source/pom.xml | 5 - flume-ng-sinks/flume-http-sink/pom.xml | 1 - .../FlumeHttpServletRequestWrapper.java | 111 +++++++- flume-ng-sources/flume-scribe-source/pom.xml | 5 - pom.xml | 33 ++- 18 files changed, 962 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index d318df2..44d9ef6 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -341,18 +341,23 @@ limitations under the License. - org.mortbay.jetty - servlet-api + org.eclipse.jetty + jetty-servlet - org.mortbay.jetty + org.eclipse.jetty jetty-util - org.mortbay.jetty - jetty + org.eclipse.jetty + jetty-server + + + + org.eclipse.jetty + jetty-jmx @@ -367,6 +372,12 @@ limitations under the License. + org.apache.httpcomponents + httpcore + test + + + org.mockito mockito-all test http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java index 921a1f7..ebb01fe 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.flume.instrumentation.http; import com.google.gson.Gson; @@ -29,11 +30,12 @@ import javax.servlet.http.HttpServletResponse; import org.apache.flume.Context; import org.apache.flume.instrumentation.MonitorService; import org.apache.flume.instrumentation.util.JMXPollUtil; -import org.mortbay.jetty.Connector; -import org.mortbay.jetty.Request; -import org.mortbay.jetty.Server; -import org.mortbay.jetty.handler.AbstractHandler; -import org.mortbay.jetty.nio.SelectChannelConnector; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.AbstractHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +62,12 @@ public class HTTPMetricsServer implements MonitorService { jettyServer = new Server(); //We can use Contexts etc if we have many urls to handle. For one url, //specifying a handler directly is the most efficient. - SelectChannelConnector connector = new SelectChannelConnector(); + HttpConfiguration httpConfiguration = new HttpConfiguration(); + ServerConnector connector = new ServerConnector(jettyServer, + new HttpConnectionFactory(httpConfiguration)); connector.setReuseAddress(true); connector.setPort(port); - jettyServer.setConnectors(new Connector[] {connector}); + jettyServer.addConnector(connector); jettyServer.setHandler(new HTTPMetricsHandler()); try { jettyServer.start(); @@ -98,10 +102,9 @@ public class HTTPMetricsServer implements MonitorService { Gson gson = new Gson(); @Override - public void handle(String target, + public void handle(String target, Request r1, HttpServletRequest request, - HttpServletResponse response, - int dispatch) throws IOException, ServletException { + HttpServletResponse response) throws IOException, ServletException { // /metrics is the only place to pull metrics. //If we want to use any other url for something else, we should make sure //that for metrics only /metrics is used to prevent backward @@ -135,5 +138,6 @@ public class HTTPMetricsServer implements MonitorService { response.flushBuffer(); //Not handling the request returns a Not found error page. } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java index 38bdfda..ce6545a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java @@ -26,22 +26,28 @@ import org.apache.flume.EventDrivenSource; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.source.AbstractSource; +import org.apache.flume.tools.FlumeBeanConfigurator; import org.apache.flume.tools.HTTPServerConstraintUtil; -import org.mortbay.jetty.Connector; -import org.mortbay.jetty.Server; -import org.mortbay.jetty.nio.SelectChannelConnector; -import org.mortbay.jetty.security.SslSocketConnector; -import org.mortbay.jetty.servlet.ServletHolder; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.jmx.MBeanContainer; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLServerSocket; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.net.ServerSocket; -import java.util.ArrayList; +import java.lang.management.ManagementFactory; import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; @@ -100,9 +106,11 @@ public class HTTPSource extends AbstractSource implements private volatile Boolean sslEnabled; private final List excludedProtocols = new LinkedList(); + private Context sourceContext; @Override public void configure(Context context) { + sourceContext = context; try { // SSL related config sslEnabled = context.getBoolean(HTTPSourceConfigurationConstants.SSL_ENABLED, false); @@ -141,15 +149,12 @@ public class HTTPSource extends AbstractSource implements } } - - @SuppressWarnings("unchecked") Class clazz = (Class) Class.forName(handlerClassName); handler = clazz.getDeclaredConstructor().newInstance(); - //ref: http://docs.codehaus.org/display/JETTY/Embedding+Jetty - //ref: http://jetty.codehaus.org/jetty/jetty-6/apidocs/org/mortbay/jetty/servlet/Context.html + Map subProps = context.getSubProperties( HTTPSourceConfigurationConstants.CONFIG_HANDLER_PREFIX); @@ -170,47 +175,62 @@ public class HTTPSource extends AbstractSource implements } } - private void checkHostAndPort() { - Preconditions.checkState(host != null && !host.isEmpty(), - "HTTPSource hostname specified is empty"); - Preconditions.checkNotNull(port, "HTTPSource requires a port number to be" - + " specified"); - } - @Override public void start() { Preconditions.checkState(srv == null, "Running HTTP Server found in source: " + getName() + " before I started one." + "Will not attempt to start."); - srv = new Server(); + QueuedThreadPool threadPool = new QueuedThreadPool(); + if (sourceContext.getSubProperties("QueuedThreadPool.").size() > 0) { + FlumeBeanConfigurator.setConfigurationFields(threadPool, sourceContext); + } + srv = new Server(threadPool); - // Connector Array - Connector[] connectors = new Connector[1]; + //Register with JMX for advanced monitoring + MBeanContainer mbContainer = new MBeanContainer(ManagementFactory.getPlatformMBeanServer()); + srv.addEventListener(mbContainer); + srv.addBean(mbContainer); + HttpConfiguration httpConfiguration = new HttpConfiguration(); + httpConfiguration.addCustomizer(new SecureRequestCustomizer()); + + FlumeBeanConfigurator.setConfigurationFields(httpConfiguration, sourceContext); + ServerConnector connector; if (sslEnabled) { - SslSocketConnector sslSocketConnector = new HTTPSourceSocketConnector(excludedProtocols); - sslSocketConnector.setKeystore(keyStorePath); - sslSocketConnector.setKeyPassword(keyStorePassword); - sslSocketConnector.setReuseAddress(true); - connectors[0] = sslSocketConnector; + SslContextFactory sslCtxFactory = new SslContextFactory(); + FlumeBeanConfigurator.setConfigurationFields(sslCtxFactory, sourceContext); + sslCtxFactory.setExcludeProtocols(excludedProtocols.toArray(new String[0])); + sslCtxFactory.setKeyStorePath(keyStorePath); + sslCtxFactory.setKeyStorePassword(keyStorePassword); + + httpConfiguration.setSecurePort(port); + httpConfiguration.setSecureScheme("https"); + + connector = new ServerConnector(srv, + new SslConnectionFactory(sslCtxFactory,HttpVersion.HTTP_1_1.asString()), + new HttpConnectionFactory(httpConfiguration)); } else { - SelectChannelConnector connector = new SelectChannelConnector(); - connector.setReuseAddress(true); - connectors[0] = connector; + connector = new ServerConnector(srv, new HttpConnectionFactory(httpConfiguration)); } - connectors[0].setHost(host); - connectors[0].setPort(port); - srv.setConnectors(connectors); + connector.setPort(port); + connector.setHost(host); + connector.setReuseAddress(true); + + FlumeBeanConfigurator.setConfigurationFields(connector, sourceContext); + + srv.addConnector(connector); + try { - org.mortbay.jetty.servlet.Context root = new org.mortbay.jetty.servlet.Context( - srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS); - root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/"); - HTTPServerConstraintUtil.enforceConstraints(root); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + srv.setHandler(context); + + context.addServlet(new ServletHolder(new FlumeHTTPServlet()),"/"); + context.setSecurityHandler(HTTPServerConstraintUtil.enforceConstraints()); srv.start(); - Preconditions.checkArgument(srv.getHandler().equals(root)); } catch (Exception ex) { LOG.error("Error while starting HTTPSource. Exception follows.", ex); Throwables.propagate(ex); @@ -288,26 +308,4 @@ public class HTTPSource extends AbstractSource implements doPost(request, response); } } - - private static class HTTPSourceSocketConnector extends SslSocketConnector { - private final List excludedProtocols; - - HTTPSourceSocketConnector(List excludedProtocols) { - this.excludedProtocols = excludedProtocols; - } - - @Override - public ServerSocket newServerSocket(String host, int port, int backlog) throws IOException { - SSLServerSocket socket = (SSLServerSocket)super.newServerSocket(host, port, backlog); - String[] protocols = socket.getEnabledProtocols(); - List newProtocols = new ArrayList(protocols.length); - for (String protocol: protocols) { - if (!excludedProtocols.contains(protocol)) { - newProtocols.add(protocol); - } - } - socket.setEnabledProtocols(newProtocols.toArray(new String[newProtocols.size()])); - return socket; - } - } } http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java index 86caf7d..f1f7a90 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java @@ -27,6 +27,7 @@ public class HTTPSourceConfigurationConstants { public static final String CONFIG_HANDLER = "handler"; public static final String CONFIG_HANDLER_PREFIX = CONFIG_HANDLER + "."; + public static final String CONFIG_BIND = "bind"; public static final String DEFAULT_BIND = "0.0.0.0"; http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/src/main/java/org/apache/flume/tools/FlumeBeanConfigurator.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/FlumeBeanConfigurator.java b/flume-ng-core/src/main/java/org/apache/flume/tools/FlumeBeanConfigurator.java new file mode 100644 index 0000000..b4f7273 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/tools/FlumeBeanConfigurator.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.tools; + +import java.lang.reflect.Method; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Context; +import org.apache.flume.conf.ConfigurationException; + +/** + * Utility class to enable runtime configuration of Java objects using provided + * Flume context objects (or equivalent). The methods use reflection to identify + * Fields on the configurable object and then looks for matching properties in + * the provided properties bundle. + * + */ +public class FlumeBeanConfigurator { + + /** + * Utility method that will set properties on a Java bean (Object configurable) + * based on the provided properties bundle. + * If there is a type issue, or an access problem + * then a ConfigurationException will be thrown. + * + * @param configurable Any properties must be modifiable via setter methods. + * @param properties Map<String, String> + * @throws ConfigurationException + */ + public static void setConfigurationFields(Object configurable, Map properties) + throws ConfigurationException { + Class clazz = configurable.getClass(); + + for (Method method : clazz.getMethods()) { + String methodName = method.getName(); + if (methodName.startsWith("set") && method.getParameterTypes().length == 1) { + String fieldName = methodName.substring(3); + + String value = properties.get(StringUtils.uncapitalize(fieldName)); + if (value != null) { + + Class fieldType = method.getParameterTypes()[0];; + try { + if (fieldType.equals(String.class)) { + method.invoke(configurable, value); + } else if (fieldType.equals(boolean.class)) { + method.invoke(configurable, Boolean.parseBoolean(value)); + } else if (fieldType.equals(short.class)) { + method.invoke(configurable, Short.parseShort(value)); + } else if (fieldType.equals(long.class)) { + method.invoke(configurable, Long.parseLong(value)); + } else if (fieldType.equals(float.class)) { + method.invoke(configurable, Float.parseFloat(value)); + } else if (fieldType.equals(int.class)) { + method.invoke(configurable, Integer.parseInt(value)); + } else if (fieldType.equals(double.class)) { + method.invoke(configurable, Double.parseDouble(value)); + } else if (fieldType.equals(char.class)) { + method.invoke(configurable, value.charAt(0)); + } else if (fieldType.equals(byte.class)) { + method.invoke(configurable, Byte.parseByte(value)); + } else if (fieldType.equals(String[].class)) { + method.invoke(configurable, (Object)value.split("\\s+")); + } else { + throw new ConfigurationException( + "Unable to configure component due to an unsupported type on field: " + + fieldName); + } + } catch (Exception ex) { + if (ex instanceof ConfigurationException) { + throw (ConfigurationException)ex; + } else { + throw new ConfigurationException("Unable to configure component: ", ex); + } + } + } + } + } + } + + /** + * Utility method that will set properties on a Java bean (Object configurable) + * based on the provided Context. + * N.B. This method will take the Flume Context and look for sub-properties named after the + * class name of the configurable object. + * If there is a type issue, or an access problem + * then a ConfigurationException will be thrown. + * + * @param configurable Any properties must be modifiable via setter methods. + * @param context + * @throws ConfigurationException + */ + public static void setConfigurationFields(Object configurable, Context context) + throws ConfigurationException { + Class clazz = configurable.getClass(); + Map properties = context.getSubProperties(clazz.getSimpleName() + "."); + setConfigurationFields(configurable, properties); + } + + /** + * Utility method that will set properties on a Java bean (Object configurable) + * based on the provided Context. + * N.B. This method will take the Flume Context and look for sub-properties named after the + * subPropertiesPrefix String. + * If there is a type issue, or an access problem + * then a ConfigurationException will be thrown. + * + * @param configurable Object: Any properties must be modifiable via setter methods. + * @param context org.apache.flume.Context; + * @param subPropertiesPrefix String + * @throws ConfigurationException + */ + public static void setConfigurationFields(Object configurable, Context context, + String subPropertiesPrefix) throws ConfigurationException { + Map properties = context.getSubProperties(subPropertiesPrefix); + setConfigurationFields(configurable, properties); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java b/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java index 479cfc4..8abf203 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java +++ b/flume-ng-core/src/main/java/org/apache/flume/tools/HTTPServerConstraintUtil.java @@ -18,15 +18,14 @@ */ package org.apache.flume.tools; -import org.mortbay.jetty.security.Constraint; -import org.mortbay.jetty.security.ConstraintMapping; -import org.mortbay.jetty.security.SecurityHandler; -import org.mortbay.jetty.servlet.Context; +import org.eclipse.jetty.util.security.Constraint; +import org.eclipse.jetty.security.ConstraintMapping; +import org.eclipse.jetty.security.ConstraintSecurityHandler; // Most of the code in this class is copied from HBASE-10473 /** - * Utility class to impose constraints on Jetty HTTP servers + * Utility class to define constraints on Jetty HTTP servers */ public class HTTPServerConstraintUtil { @@ -36,12 +35,10 @@ public class HTTPServerConstraintUtil { } /** - * Impose constraints on the {@linkplain org.mortbay.jetty.servlet.Context} - * passed in. - * @param ctx - {@linkplain org.mortbay.jetty.servlet.Context} to impose - * constraints on. + * Generate constraints for the Flume HTTP Source + * @return ConstraintSecurityHandler for use with Jetty servlet */ - public static void enforceConstraints(Context ctx) { + public static ConstraintSecurityHandler enforceConstraints() { Constraint c = new Constraint(); c.setAuthenticate(true); @@ -55,8 +52,9 @@ public class HTTPServerConstraintUtil { cmo.setMethod("OPTIONS"); cmo.setPathSpec("/*"); - SecurityHandler sh = new SecurityHandler(); + ConstraintSecurityHandler sh = new ConstraintSecurityHandler(); sh.setConstraintMappings(new ConstraintMapping[]{cmt, cmo}); - ctx.addHandler(sh); + + return sh; } } http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java index 09d419f..bf0bf0e 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.flume.instrumentation.http; import com.google.gson.Gson; http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java index 475d92f..fe18222 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletRequestWrapper.java @@ -23,14 +23,25 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.security.Principal; +import java.util.Collection; import java.util.Enumeration; import java.util.Locale; import java.util.Map; + +import javax.servlet.AsyncContext; +import javax.servlet.DispatcherType; import javax.servlet.RequestDispatcher; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; import javax.servlet.ServletInputStream; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; +import javax.servlet.http.HttpUpgradeHandler; +import javax.servlet.http.Part; /** * @@ -73,12 +84,12 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { } @Override - public Enumeration getHeaders(String name) { + public Enumeration getHeaders(String name) { throw new UnsupportedOperationException("Not supported yet."); } @Override - public Enumeration getHeaderNames() { + public Enumeration getHeaderNames() { throw new UnsupportedOperationException("Not supported yet."); } @@ -183,7 +194,7 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { } @Override - public Enumeration getAttributeNames() { + public Enumeration getAttributeNames() { throw new UnsupportedOperationException("Not supported yet."); } @@ -218,7 +229,7 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { } @Override - public Enumeration getParameterNames() { + public Enumeration getParameterNames() { throw new UnsupportedOperationException("Not supported yet."); } @@ -228,7 +239,7 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { } @Override - public Map getParameterMap() { + public Map getParameterMap() { throw new UnsupportedOperationException("Not supported yet."); } @@ -283,7 +294,7 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { } @Override - public Enumeration getLocales() { + public Enumeration getLocales() { throw new UnsupportedOperationException("Not supported yet."); } @@ -321,4 +332,82 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { public int getLocalPort() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public AsyncContext getAsyncContext() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public long getContentLengthLong() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public DispatcherType getDispatcherType() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public ServletContext getServletContext() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isAsyncStarted() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isAsyncSupported() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public AsyncContext startAsync() throws IllegalStateException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public AsyncContext startAsync(ServletRequest arg0, ServletResponse arg1) + throws IllegalStateException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean authenticate(HttpServletResponse arg0) + throws IOException, ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public String changeSessionId() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Part getPart(String arg0) throws IOException, ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Collection getParts() throws IOException, ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void login(String arg0, String arg1) throws ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void logout() throws ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public T upgrade(Class arg0) + throws IOException, ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java index f770d51..7d46a78 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestBLOBHandler.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; import javax.servlet.http.HttpServletRequest; @@ -165,6 +166,18 @@ public class TestBLOBHandler { this.sourceStream.close(); } + public boolean isFinished() { + throw new UnsupportedOperationException("Not supported yet."); + } + + public boolean isReady() { + throw new UnsupportedOperationException("Not supported yet."); + } + + public void setReadListener(ReadListener arg0) { + throw new UnsupportedOperationException("Not supported yet."); + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java index 3ad8282..bb61dce 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java @@ -17,7 +17,6 @@ */ package org.apache.flume.source.http; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -32,19 +31,26 @@ import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.event.JSONEvent; -import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpOptions; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.client.methods.HttpTrace; -import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.HttpResponse; +import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.Query; +import javax.management.QueryExp; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; @@ -54,6 +60,7 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.lang.reflect.Type; import java.net.HttpURLConnection; import java.net.InetAddress; @@ -64,9 +71,11 @@ import java.net.UnknownHostException; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import static org.fest.reflect.core.Reflection.field; @@ -77,12 +86,12 @@ public class TestHTTPSource { private static HTTPSource source; private static HTTPSource httpsSource; -// private static Channel httpsChannel; private static Channel channel; + private static Channel httpsChannel; private static int selectedPort; private static int sslPort; - DefaultHttpClient httpClient; + HttpClient httpClient; HttpPost postRequest; private static int findFreePort() throws IOException { @@ -92,64 +101,67 @@ public class TestHTTPSource { return port; } - @BeforeClass - public static void setUpClass() throws Exception { - selectedPort = findFreePort(); - - source = new HTTPSource(); - channel = new MemoryChannel(); - - httpsSource = new HTTPSource(); - httpsSource.setName("HTTPS Source"); - + private static Context getDefaultNonSecureContext(int selectedPort) throws IOException { Context ctx = new Context(); - ctx.put("capacity", "100"); - Configurables.configure(channel, ctx); - - List channels = new ArrayList(1); - channels.add(channel); - - ChannelSelector rcs = new ReplicatingChannelSelector(); - rcs.setChannels(channels); - - source.setChannelProcessor(new ChannelProcessor(rcs)); - - channel.start(); - - httpsSource.setChannelProcessor(new ChannelProcessor(rcs)); - - // HTTP context - Context context = new Context(); - - context.put("port", String.valueOf(selectedPort)); - context.put("host", "0.0.0.0"); + ctx.put(HTTPSourceConfigurationConstants.CONFIG_BIND, "0.0.0.0"); + ctx.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(selectedPort)); + ctx.put("QueuedThreadPool.MaxThreads", "100"); + return ctx; + } - // SSL context props + private static Context getDefaultSecureContext(int sslPort) throws IOException { Context sslContext = new Context(); + sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(sslPort)); sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true"); - sslPort = findFreePort(); - sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, - String.valueOf(sslPort)); sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, "password"); sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE, "src/test/resources/jettykeystore"); + return sslContext; + } - Configurables.configure(source, context); - Configurables.configure(httpsSource, sslContext); + @BeforeClass + public static void setUpClass() throws Exception { + source = new HTTPSource(); + channel = new MemoryChannel(); + selectedPort = findFreePort(); + configureSourceAndChannel(source, channel, getDefaultNonSecureContext(selectedPort)); + channel.start(); source.start(); + + httpsSource = new HTTPSource(); + httpsChannel = new MemoryChannel(); + sslPort = findFreePort(); + configureSourceAndChannel(httpsSource, httpsChannel, getDefaultSecureContext(sslPort)); + httpsChannel.start(); httpsSource.start(); } + private static void configureSourceAndChannel( + HTTPSource source, Channel channel, Context context + ) { + Context channelContext = new Context(); + channelContext.put("capacity", "100"); + Configurables.configure(channel, channelContext); + Configurables.configure(source, context); + + ChannelSelector rcs1 = new ReplicatingChannelSelector(); + rcs1.setChannels(Collections.singletonList(channel)); + + source.setChannelProcessor(new ChannelProcessor(rcs1)); + } + @AfterClass public static void tearDownClass() throws Exception { source.stop(); channel.stop(); httpsSource.stop(); + httpsChannel.stop(); } @Before public void setUp() { - httpClient = new DefaultHttpClient(); + HttpClientBuilder builder = HttpClientBuilder.create(); + httpClient = builder.build(); postRequest = new HttpPost("http://0.0.0.0:" + selectedPort); } @@ -271,6 +283,88 @@ public class TestHTTPSource { tx.close(); } + /** + * First test that the unconfigured behaviour is as-expected, then add configurations + * to a new channel and observe the difference. + * For some of the properties, the most convenient way to test is using the MBean interface + * We test all of HttpConfiguration, ServerConnector, QueuedThreadPool and SslContextFactory + * sub-configurations (but not all properties) + */ + @Test + public void testConfigurables() throws Exception { + StringEntity input = new StringEntity("[{\"headers\" : {\"a\": \"b\"},\"body\":" + + " \"random_body\"}]"); + input.setContentType("application/json"); + postRequest.setEntity(input); + + HttpResponse resp = httpClient.execute(postRequest); + + // Testing default behaviour (to not provided X-Powered-By, but to provide Server headers) + Assert.assertTrue(resp.getHeaders("X-Powered-By").length == 0); + Assert.assertTrue(resp.getHeaders("Server").length == 1); + + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = channel.take(); + Assert.assertNotNull(e); + tx.commit(); + tx.close(); + Assert.assertTrue(findMBeans("org.eclipse.jetty.util.thread:type=queuedthreadpool,*", + "maxThreads", 123).size() == 0); + Assert.assertTrue(findMBeans("org.eclipse.jetty.server:type=serverconnector,*", + "acceptQueueSize", 22).size() == 0); + + int newPort = findFreePort(); + Context configuredSourceContext = getDefaultNonSecureContext(newPort); + configuredSourceContext.put("HttpConfiguration.sendServerVersion", "false"); + configuredSourceContext.put("HttpConfiguration.sendXPoweredBy", "true"); + configuredSourceContext.put("ServerConnector.acceptQueueSize", "22"); + configuredSourceContext.put("QueuedThreadPool.maxThreads", "123"); + + HTTPSource newSource = new HTTPSource(); + Channel newChannel = new MemoryChannel(); + configureSourceAndChannel(newSource, newChannel, configuredSourceContext); + newChannel.start(); + newSource.start(); + + HttpPost newPostRequest = new HttpPost("http://0.0.0.0:" + newPort); + + resp = httpClient.execute(newPostRequest); + Assert.assertTrue(resp.getHeaders("X-Powered-By").length > 0); + Assert.assertTrue(resp.getHeaders("Server").length == 0); + Assert.assertTrue(findMBeans("org.eclipse.jetty.util.thread:type=queuedthreadpool,*", + "maxThreads", 123).size() == 1); + Assert.assertTrue(findMBeans("org.eclipse.jetty.server:type=serverconnector,*", + "acceptQueueSize", 22).size() == 1); + + newSource.stop(); + newChannel.stop(); + + //Configure SslContextFactory with junk protocols (expect failure) + newPort = findFreePort(); + configuredSourceContext = getDefaultSecureContext(newPort); + configuredSourceContext.put("SslContextFactory.IncludeProtocols", "abc def"); + + newSource = new HTTPSource(); + newChannel = new MemoryChannel(); + + configureSourceAndChannel(newSource, newChannel, configuredSourceContext); + + newChannel.start(); + newSource.start(); + + newPostRequest = new HttpPost("http://0.0.0.0:" + newPort); + try { + doTestHttps(null, newPort); + //We are testing that this fails because we've deliberately configured the wrong protocols + Assert.assertTrue(false); + } catch (AssertionError ex) { + //no-op + } + newSource.stop(); + newChannel.stop(); + } + @Test public void testFullChannel() throws Exception { HttpResponse response = putWithEncoding("UTF-8", 150).response; @@ -293,6 +387,14 @@ public class TestHTTPSource { } @Test + public void testMBeans() throws Exception { + MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + ObjectName objectName = new ObjectName("org.eclipse.jetty.*:*"); + Set queryMBeans = mbeanServer.queryMBeans(objectName, null); + Assert.assertTrue(queryMBeans.size() > 0); + } + + @Test public void testHandlerThrowingException() throws Exception { //This will cause the handler to throw an //UnsupportedCharsetException. @@ -301,10 +403,17 @@ public class TestHTTPSource { response.getStatusLine().getStatusCode()); } + private Set findMBeans(String name, String attribute, int value) + throws MalformedObjectNameException { + MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + ObjectName objectName = new ObjectName(name); + QueryExp q = Query.eq(Query.attr(attribute), Query.value(value)); + return mbeanServer.queryMBeans(objectName, q); + } private ResultWrapper putWithEncoding(String encoding, int n) throws Exception { Type listType = new TypeToken>() {}.getType(); - List events = Lists.newArrayList(); + List events = new ArrayList(); Random rand = new Random(); for (int i = 0; i < n; i++) { Map input = Maps.newHashMap(); @@ -327,18 +436,18 @@ public class TestHTTPSource { @Test public void testHttps() throws Exception { - doTestHttps(null); + doTestHttps(null, sslPort); } @Test (expected = javax.net.ssl.SSLHandshakeException.class) public void testHttpsSSLv3() throws Exception { - doTestHttps("SSLv3"); + doTestHttps("SSLv3", sslPort); } - public void doTestHttps(String protocol) throws Exception { + public void doTestHttps(String protocol, int port) throws Exception { Type listType = new TypeToken>() { }.getType(); - List events = Lists.newArrayList(); + List events = new ArrayList(); Random rand = new Random(); for (int i = 0; i < 10; i++) { Map input = Maps.newHashMap(); @@ -354,6 +463,7 @@ public class TestHTTPSource { Gson gson = new Gson(); String json = gson.toJson(events, listType); HttpsURLConnection httpsURLConnection = null; + Transaction transaction = null; try { TrustManager[] trustAllCerts = { new X509TrustManager() { @@ -396,8 +506,8 @@ public class TestHTTPSource { factory = sc.getSocketFactory(); } HttpsURLConnection.setDefaultSSLSocketFactory(factory); - HttpsURLConnection.setDefaultHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); - URL sslUrl = new URL("https://0.0.0.0:" + sslPort); + HttpsURLConnection.setDefaultHostnameVerifier(NoopHostnameVerifier.INSTANCE); + URL sslUrl = new URL("https://0.0.0.0:" + port); httpsURLConnection = (HttpsURLConnection) sslUrl.openConnection(); httpsURLConnection.setDoInput(true); httpsURLConnection.setDoOutput(true); @@ -407,17 +517,19 @@ public class TestHTTPSource { int statusCode = httpsURLConnection.getResponseCode(); Assert.assertEquals(200, statusCode); - Transaction transaction = channel.getTransaction(); + transaction = httpsChannel.getTransaction(); transaction.begin(); for (int i = 0; i < 10; i++) { - Event e = channel.take(); + Event e = httpsChannel.take(); Assert.assertNotNull(e); Assert.assertEquals(String.valueOf(i), e.getHeaders().get("MsgNum")); } - transaction.commit(); - transaction.close(); } finally { + if (transaction != null) { + transaction.commit(); + transaction.close(); + } httpsURLConnection.disconnect(); } } @@ -426,7 +538,7 @@ public class TestHTTPSource { public void testHttpsSourceNonHttpsClient() throws Exception { Type listType = new TypeToken>() { }.getType(); - List events = Lists.newArrayList(); + List events = new ArrayList(); Random rand = new Random(); for (int i = 0; i < 10; i++) { Map input = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-core/src/test/java/org/apache/flume/tools/TestFlumeConfigurator.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/tools/TestFlumeConfigurator.java b/flume-ng-core/src/test/java/org/apache/flume/tools/TestFlumeConfigurator.java new file mode 100644 index 0000000..26ecb0a --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/tools/TestFlumeConfigurator.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.tools; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.flume.Context; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import junit.framework.Assert; + +public class TestFlumeConfigurator { + private String testPrefix = "TestBean."; + + /** + * Test configuring an int. Creates a random int greater than zero + * and then uses FlumeBeanConfigurator to test it can be set. + */ + @Test + public void testIntConfiguration() { + + Map props = new HashMap(); + Random random = new Random(); + int intValue = random.nextInt(Integer.MAX_VALUE - 1 ) + 1; + props.put(testPrefix + "testInt", Integer.toString(intValue)); + Context context = new Context(props); + + TestBean bean = new TestBean(); + Assert.assertEquals(0, bean.getTestInt()); + + FlumeBeanConfigurator.setConfigurationFields(bean, context); + Assert.assertEquals(intValue, bean.getTestInt()); + } + + /** + * Test configuring an short. Creates a random short greater than zero + * and then uses FlumeBeanConfigurator to test it can be set. + */ + @Test + public void testShortConfiguration() { + + Map props = new HashMap(); + Random random = new Random(); + short shortValue = (short)(random.nextInt(Short.MAX_VALUE - 1 ) + 1); + props.put(testPrefix + "testShort", Short.toString(shortValue)); + Context context = new Context(props); + + TestBean bean = new TestBean(); + Assert.assertEquals(0, bean.getTestShort()); + + FlumeBeanConfigurator.setConfigurationFields(bean, context); + Assert.assertEquals(shortValue, bean.getTestShort()); + } + + /** + * Test configuring a long. Creates a random long greater than Integer.MAX_VALUE + * and then uses FlumeBeanConfigurator to test it can be set. + */ + + @Test + public void testLongConfiguration() { + + Map props = new HashMap(); + long longValue = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE, Long.MAX_VALUE); + props.put(testPrefix + "testLong", Long.toString(longValue)); + Context context = new Context(props); + + TestBean bean = new TestBean(); + Assert.assertEquals(0, bean.getTestLong()); + + FlumeBeanConfigurator.setConfigurationFields(bean, context); + Assert.assertEquals(longValue, bean.getTestLong()); + } + + /** + * Test configuring an byte. Creates a random byte greater than zero + * and then uses FlumeBeanConfigurator to test it can be set. + */ + @Test + public void testByteConfiguration() { + + Map props = new HashMap(); + Random random = new Random(); + byte byteValue = (byte)(random.nextInt(Byte.MAX_VALUE - 1 ) + 1); + props.put(testPrefix + "testByte", Byte.toString(byteValue)); + Context context = new Context(props); + + TestBean bean = new TestBean(); + Assert.assertEquals(0, bean.getTestByte()); + + FlumeBeanConfigurator.setConfigurationFields(bean, context); + Assert.assertEquals(byteValue, bean.getTestByte()); + } + + /** + * Test configuring an boolean. + */ + @Test + public void testBooleanConfiguration() { + + Map props = new HashMap(); + props.put(testPrefix + "testBoolean", "true"); + Context context = new Context(props); + + TestBean bean = new TestBean(); + Assert.assertEquals(false, bean.getTestBoolean()); + + FlumeBeanConfigurator.setConfigurationFields(bean, context); + Assert.assertEquals(true, bean.getTestBoolean()); + } + + /** + * Test configuring an double. Creates a random double + * and then uses FlumeBeanConfigurator to test it can be set. + */ + @Test + public void testDoubleConfiguration() { + + Map props = new HashMap(); + Random random = new Random(); + double doubleValue = random.nextDouble(); + props.put(testPrefix + "testDouble", Double.toString(doubleValue)); + + Context context = new Context(props); + + TestBean bean = new TestBean(); + Assert.assertEquals(0.0d, bean.getTestDouble()); + + FlumeBeanConfigurator.setConfigurationFields(bean, context); + Assert.assertEquals(doubleValue, bean.getTestDouble()); + } + + /** + * Test configuring an float. Creates a random float + * and then uses FlumeBeanConfigurator to test it can be set. + */ + + @Test + public void testFloatConfiguration() { + + Map props = new HashMap(); + Random random = new Random(); + float floatValue = random.nextFloat(); + props.put(testPrefix + "testFloat", Float.toString(floatValue)); + + Context context = new Context(props); + + TestBean bean = new TestBean(); + Assert.assertEquals(0.0f, bean.getTestFloat()); + + FlumeBeanConfigurator.setConfigurationFields(bean, context); + Assert.assertEquals(floatValue, bean.getTestFloat()); + } + + /** + * Test configuring a String. Creates a random String (UUID in this case) + * and then uses FlumeBeanConfigurator to test it can be set. + */ + @Test + public void testStringConfiguration() { + + Map props = new HashMap(); + String stringValue = UUID.randomUUID().toString(); + props.put(testPrefix + "testString", stringValue); + Context context = new Context(props); + + TestBean bean = new TestBean(); + Assert.assertEquals("", bean.getTestString()); + + FlumeBeanConfigurator.setConfigurationFields(bean, context); + Assert.assertEquals(stringValue, bean.getTestString()); + } + + /** + * Test that is is not possible to configure using private setters. + */ + @Test + public void testPrivateConfiguration() { + + Map props = new HashMap(); + Random random = new Random(); + int intValue = random.nextInt(Integer.MAX_VALUE - 1 ) + 1; + props.put(testPrefix + "privateInt", Integer.toString(intValue)); + Context context = new Context(props); + + TestBean bean = new TestBean(); + Assert.assertEquals(0, bean.getPrivateInt()); + + FlumeBeanConfigurator.setConfigurationFields(bean, context); + Assert.assertTrue(bean.getPrivateInt() != intValue); + } + + public class TestBean { + private int testInt = 0; + private short testShort = 0; + private long testLong = 0; + private byte testByte = 0; + private boolean testBoolean = false; + private float testFloat = 0f; + private double testDouble = 0d; + private String testString = ""; + private int privateInt = 0; + + public int getTestInt() { + return testInt; + } + public void setTestInt(int testInt) { + this.testInt = testInt; + } + public short getTestShort() { + return testShort; + } + public void setTestShort(short testShort) { + this.testShort = testShort; + } + public long getTestLong() { + return testLong; + } + public void setTestLong(long testLong) { + this.testLong = testLong; + } + public byte getTestByte() { + return testByte; + } + public void setTestByte(byte testByte) { + this.testByte = testByte; + } + public boolean getTestBoolean() { + return testBoolean; + } + public void setTestBoolean(boolean testBoolean) { + this.testBoolean = testBoolean; + } + public float getTestFloat() { + return testFloat; + } + public void setTestFloat(float testFloat) { + this.testFloat = testFloat; + } + public double getTestDouble() { + return testDouble; + } + public void setTestDouble(double testDouble) { + this.testDouble = testDouble; + } + public String getTestString() { + return testString; + } + public void setTestString(String testString) { + this.testString = testString; + } + private int getPrivateInt() { + return privateInt; + } + private void setPrivateInt(int privateInt) { + this.privateInt = privateInt; + } + + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 73ed7b8..909fe4a 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1725,25 +1725,43 @@ unavailable status. All events sent in one post request are considered to be one batch and inserted into the channel in one transaction. -================= ============================================ ===================================================================================== -Property Name Default Description -================= ============================================ ===================================================================================== -**type** The component type name, needs to be ``http`` -**port** -- The port the source should bind to. -bind 0.0.0.0 The hostname or IP address to listen on -handler ``org.apache.flume.source.http.JSONHandler`` The FQCN of the handler class. -handler.* -- Config parameters for the handler -selector.type replicating replicating or multiplexing -selector.* Depends on the selector.type value -interceptors -- Space-separated list of interceptors +This source is based on Jetty 9.4 and offers the ability to set additional +Jetty-specific parameters which will be passed directly to the Jetty components. + +==================== ============================================ ===================================================================================== +Property Name Default Description +==================== ============================================ ===================================================================================== +**type** The component type name, needs to be ``http`` +**port** -- The port the source should bind to. +bind 0.0.0.0 The hostname or IP address to listen on +handler ``org.apache.flume.source.http.JSONHandler`` The FQCN of the handler class. +handler.* -- Config parameters for the handler +selector.type replicating replicating or multiplexing +selector.* Depends on the selector.type value +interceptors -- Space-separated list of interceptors interceptors.* -enableSSL false Set the property true, to enable SSL. *HTTP Source does not support SSLv3.* -excludeProtocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 is always excluded. -keystore Location of the keystore includng keystore file name -keystorePassword Keystore password -====================================================================================================================================================== - -For example, a http source for agent named a1: +enableSSL false Set the property true, to enable SSL. *HTTP Source does not support SSLv3.* +excludeProtocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 is always excluded. +keystore Location of the keystore includng keystore file name +keystorePassword Keystore password +QueuedThreadPool.* Jetty specific settings to be set on org.eclipse.jetty.util.thread.QueuedThreadPool. + N.B. QueuedThreadPool will only be used if at least one property of this class is set. +HttpConfiguration.* Jetty specific settings to be set on org.eclipse.jetty.server.HttpConfiguration +SslContextFactory.* Jetty specific settings to be set on org.eclipse.jetty.util.ssl.SslContextFactory (only + applicable when *enableSSL* is set to true). +ServerConnector.* Jetty specific settings to be set on org.eclipse.jetty.server.ServerConnector +========================================================================================================================================================= + +N.B. Jetty-specific settings are set using the setter-methods on the objects listed above. For full details see the Javadoc for these classes +(`QueuedThreadPool `_, +`HttpConfiguration `_, +`SslContextFactory `_ and +`ServerConnector `_). + +When using Jetty-specific setings, named properites above will take precedence (for example excludeProtocols will take +precedence over SslContextFactory.ExcludeProtocols). All properties will be inital lower case. + +An example http source for agent named a1: .. code-block:: properties @@ -1754,6 +1772,8 @@ For example, a http source for agent named a1: a1.sources.r1.channels = c1 a1.sources.r1.handler = org.example.rest.RestHandler a1.sources.r1.handler.nickname = random props + a1.sources.r1.HttpConfiguration.sendServerVersion = false + a1.sources.r1.ServerConnector.idleTimeout = 300 JSONHandler ''''''''''' http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-legacy-sources/flume-avro-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-legacy-sources/flume-avro-source/pom.xml b/flume-ng-legacy-sources/flume-avro-source/pom.xml index a87207a..e9cfab7 100644 --- a/flume-ng-legacy-sources/flume-avro-source/pom.xml +++ b/flume-ng-legacy-sources/flume-avro-source/pom.xml @@ -130,11 +130,6 @@ limitations under the License. avro-ipc - - org.mortbay.jetty - servlet-api - - http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-legacy-sources/flume-thrift-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-legacy-sources/flume-thrift-source/pom.xml b/flume-ng-legacy-sources/flume-thrift-source/pom.xml index 8301c45..91cd562 100644 --- a/flume-ng-legacy-sources/flume-thrift-source/pom.xml +++ b/flume-ng-legacy-sources/flume-thrift-source/pom.xml @@ -161,11 +161,6 @@ limitations under the License. - org.mortbay.jetty - servlet-api - - - commons-lang commons-lang http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-sinks/flume-http-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-http-sink/pom.xml b/flume-ng-sinks/flume-http-sink/pom.xml index 952e8e7..52b038a 100644 --- a/flume-ng-sinks/flume-http-sink/pom.xml +++ b/flume-ng-sinks/flume-http-sink/pom.xml @@ -122,7 +122,6 @@ limitations under the License. org.apache.httpcomponents httpclient - ${httpclient.version} test http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java index 9711a3a..5c7c8e3 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java @@ -23,25 +23,51 @@ import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.security.Principal; +import java.util.Collection; import java.util.Collections; import java.util.Enumeration; import java.util.Locale; import java.util.Map; +import javax.servlet.AsyncContext; +import javax.servlet.DispatcherType; +import javax.servlet.ReadListener; import javax.servlet.RequestDispatcher; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; import javax.servlet.ServletInputStream; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; +import javax.servlet.http.HttpUpgradeHandler; +import javax.servlet.http.Part; class FlumeHttpServletRequestWrapper implements HttpServletRequest { private ServletInputStream stream; private String charset; - + public FlumeHttpServletRequestWrapper(final byte[] data) { stream = new ServletInputStream() { - private final InputStream in = new ByteArrayInputStream(data); + @Override + public boolean isFinished() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isReady() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void setReadListener(ReadListener readListener) { + throw new UnsupportedOperationException("Not supported yet."); + } + + private final InputStream in = new ByteArrayInputStream(data); @Override public int read() throws IOException { return in.read(); @@ -155,6 +181,11 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { } @Override + public String changeSessionId() { + return null; + } + + @Override public boolean isRequestedSessionIdValid() { throw new UnsupportedOperationException("Not supported yet."); } @@ -175,12 +206,43 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { } @Override + public boolean authenticate(HttpServletResponse response) throws IOException, ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void login(String username, String password) throws ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void logout() throws ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Collection getParts() throws IOException, ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Part getPart(String name) throws IOException, ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public T upgrade(Class handlerClass) + throws IOException, ServletException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override public Object getAttribute(String name) { throw new UnsupportedOperationException("Not supported yet."); } @Override - public Enumeration getAttributeNames() { + public Enumeration getAttributeNames() { throw new UnsupportedOperationException("Not supported yet."); } @@ -200,6 +262,11 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { } @Override + public long getContentLengthLong() { + return 0; + } + + @Override public String getContentType() { return null; } @@ -280,7 +347,7 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { } @Override - public Enumeration getLocales() { + public Enumeration getLocales() { throw new UnsupportedOperationException("Not supported yet."); } @@ -318,4 +385,40 @@ class FlumeHttpServletRequestWrapper implements HttpServletRequest { public int getLocalPort() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public ServletContext getServletContext() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public AsyncContext startAsync() throws IllegalStateException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) + throws IllegalStateException { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isAsyncStarted() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isAsyncSupported() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public AsyncContext getAsyncContext() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public DispatcherType getDispatcherType() { + throw new UnsupportedOperationException("Not supported yet."); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/flume-ng-sources/flume-scribe-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-scribe-source/pom.xml b/flume-ng-sources/flume-scribe-source/pom.xml index e11f16d..dc46c62 100644 --- a/flume-ng-sources/flume-scribe-source/pom.xml +++ b/flume-ng-sources/flume-scribe-source/pom.xml @@ -160,11 +160,6 @@ limitations under the License. libthrift - - org.mortbay.jetty - servlet-api - - http://git-wip-us.apache.org/repos/asf/flume/blob/580f7813/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4cb3881..da84b5b 100644 --- a/pom.xml +++ b/pom.xml @@ -70,11 +70,11 @@ limitations under the License. 18.0 11.0.2 2.4.0 - 4.3.5 - 4.2.1 + 4.4.6 + 4.5.3 1.10 1.8 - 6.1.26 + 9.4.6.v20170531 2.9.9 4.10 0.9.0.1 @@ -86,7 +86,6 @@ limitations under the License. 0.9.9 2.0.4 1.9.0 - 2.5-20110124 1.7 2.3 1.7 @@ -1073,27 +1072,39 @@ limitations under the License. - org.mortbay.jetty - servlet-api - ${mortbay-jetty-servlet-api.version} + org.eclipse.jetty + jetty-servlet + ${jetty.version} - org.mortbay.jetty + org.eclipse.jetty jetty-util ${jetty.version} - org.mortbay.jetty - jetty + org.eclipse.jetty + jetty-server + ${jetty.version} + + + + org.eclipse.jetty + jetty-jmx ${jetty.version} org.apache.httpcomponents httpclient - ${httpclient-old.version} + ${httpclient.version} + + + + org.apache.httpcomponents + httpcore + ${httpcore.version}