From commits-return-2918-archive-asf-public=cust-asf.ponee.io@flume.apache.org Fri Oct 5 11:42:48 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 80FE8180677 for ; Fri, 5 Oct 2018 11:42:46 +0200 (CEST) Received: (qmail 72662 invoked by uid 500); 5 Oct 2018 09:42:45 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 72653 invoked by uid 99); 5 Oct 2018 09:42:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Oct 2018 09:42:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0E1FFE012E; Fri, 5 Oct 2018 09:42:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: szaboferee@apache.org To: commits@flume.apache.org Date: Fri, 05 Oct 2018 09:42:46 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flume git commit: FLUME-3269: Support JSSE keystore/trustore -D system properties FLUME-3269: Support JSSE keystore/trustore -D system properties It makes possible to specify global/common SSL keystore parameters (path, password and type) at Flume agent (process) level for all sources/sinks. In this way, it is not necessary to define (=copy) the SSL config for each component in the agent config. The global SSL parameters can be specified through the standard -D JSSE system properties or in environment variables. Component level configuration is still possible. Priority: 1. component parameters in agent config 2. -D system properties 2. environment variables This closes #228 Reviewers: Ferenc Szabo, Tristan Stevens, Endre Major (Peter Turcsanyi via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c5168c90 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c5168c90 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c5168c90 Branch: refs/heads/trunk Commit: c5168c902634e8ea1f25ec578ed0b7055b246d68 Parents: 1b43783 Author: Peter Turcsanyi Authored: Fri Oct 5 11:40:28 2018 +0200 Committer: Ferenc Szabo Committed: Fri Oct 5 11:40:28 2018 +0200 ---------------------------------------------------------------------- .../apache/flume/client/avro/AvroCLIClient.java | 3 + .../org/apache/flume/source/AvroSource.java | 8 +- .../org/apache/flume/source/ThriftSource.java | 8 +- .../apache/flume/source/http/HTTPSource.java | 9 +- .../org/apache/flume/sink/TestAvroSink.java | 253 +++++++++---------- .../org/apache/flume/sink/TestThriftSink.java | 167 ++++++------ .../org/apache/flume/source/TestAvroSource.java | 31 ++- .../apache/flume/source/TestThriftSource.java | 47 +++- .../flume/source/http/TestHTTPSource.java | 134 ++++++---- flume-ng-doc/sphinx/FlumeUserGuide.rst | 217 +++++++++++++--- .../java/org/apache/flume/node/Application.java | 8 +- .../apache/flume/api/NettyAvroRpcClient.java | 15 +- .../org/apache/flume/api/ThriftRpcClient.java | 12 +- .../java/org/apache/flume/util/SSLUtil.java | 106 ++++++++ .../apache/flume/api/TestThriftRpcClient.java | 10 +- .../apache/flume/util/AbstractSSLUtilTest.java | 124 +++++++++ .../flume/util/SSLUtilKeystorePasswordTest.java | 49 ++++ .../flume/util/SSLUtilKeystorePathTest.java | 48 ++++ .../flume/util/SSLUtilKeystoreTypeTest.java | 48 ++++ .../SSLUtilKeystoreTypeWithDefaultTest.java | 64 +++++ .../util/SSLUtilTruststorePasswordTest.java | 49 ++++ .../flume/util/SSLUtilTruststorePathTest.java | 48 ++++ .../flume/util/SSLUtilTruststoreTypeTest.java | 48 ++++ .../SSLUtilTruststoreTypeWithDefaultTest.java | 64 +++++ 24 files changed, 1233 insertions(+), 337 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java index 242c821..0f0fda6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java @@ -44,6 +44,7 @@ import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.instrumentation.SourceCounter; +import org.apache.flume.util.SSLUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,8 @@ public class AvroCLIClient { private int sent; public static void main(String[] args) { + SSLUtil.initGlobalSSLParameters(); + AvroCLIClient client = new AvroCLIClient(); try { http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java index a105bbe..e7b12bd 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java @@ -41,6 +41,7 @@ import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.flume.source.avro.AvroSourceProtocol; import org.apache.flume.source.avro.Status; +import org.apache.flume.util.SSLUtil; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; @@ -180,9 +181,10 @@ public class AvroSource extends AbstractSource implements EventDrivenSource, } enableSsl = context.getBoolean(SSL_KEY, false); - keystore = context.getString(KEYSTORE_KEY); - keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY); - keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS"); + keystore = context.getString(KEYSTORE_KEY, SSLUtil.getGlobalKeystorePath()); + keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY, + SSLUtil.getGlobalKeystorePassword()); + keystoreType = context.getString(KEYSTORE_TYPE_KEY, SSLUtil.getGlobalKeystoreType("JKS")); String excludeProtocolsStr = context.getString(EXCLUDE_PROTOCOLS); if (excludeProtocolsStr == null) { excludeProtocols.add("SSLv3"); http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java index 33c37f2..637c42e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java @@ -34,6 +34,7 @@ import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.thrift.Status; import org.apache.flume.thrift.ThriftSourceProtocol; import org.apache.flume.thrift.ThriftFlumeEvent; +import org.apache.flume.util.SSLUtil; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TBinaryProtocol; @@ -160,9 +161,10 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD enableSsl = context.getBoolean(SSL_KEY, false); if (enableSsl) { - keystore = context.getString(KEYSTORE_KEY); - keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY); - keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS"); + keystore = context.getString(KEYSTORE_KEY, SSLUtil.getGlobalKeystorePath()); + keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY, + SSLUtil.getGlobalKeystorePassword()); + keystoreType = context.getString(KEYSTORE_TYPE_KEY, SSLUtil.getGlobalKeystoreType("JKS")); String excludeProtocolsStr = context.getString(EXCLUDE_PROTOCOLS); if (excludeProtocolsStr == null) { excludeProtocols.add("SSLv3"); http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java index d14bde2..e9324fb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java @@ -28,6 +28,7 @@ import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.source.AbstractSource; import org.apache.flume.tools.FlumeBeanConfigurator; import org.apache.flume.tools.HTTPServerConstraintUtil; +import org.apache.flume.util.SSLUtil; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.SecureRequestCustomizer; @@ -130,11 +131,13 @@ public class HTTPSource extends AbstractSource implements if (sslEnabled) { LOG.debug("SSL configuration enabled"); - keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE); + keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE, + SSLUtil.getGlobalKeystorePath()); Preconditions.checkArgument(keyStorePath != null && !keyStorePath.isEmpty(), "Keystore is required for SSL Conifguration" ); - keyStorePassword = - context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD); + keyStorePassword = context.getString( + HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, + SSLUtil.getGlobalKeystorePassword()); Preconditions.checkArgument(keyStorePassword != null, "Keystore password is required for SSL Configuration"); String excludeProtocolsStr = http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java index 8b6f493..cc2c91a 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java @@ -103,13 +103,8 @@ public class TestAvroSink { sink = new AvroSink(); channel = new MemoryChannel(); - Context context = new Context(); + Context context = createBaseContext(); - context.put("hostname", hostname); - context.put("port", String.valueOf(port)); - context.put("batch-size", String.valueOf(2)); - context.put("connect-timeout", String.valueOf(2000L)); - context.put("request-timeout", String.valueOf(3000L)); if (compressionType.equals("deflate")) { context.put("compression-type", compressionType); context.put("compression-level", Integer.toString(compressionLevel)); @@ -121,6 +116,28 @@ public class TestAvroSink { Configurables.configure(channel, context); } + private Context createBaseContext() { + Context context = new Context(); + + context.put("hostname", hostname); + context.put("port", String.valueOf(port)); + context.put("batch-size", String.valueOf(2)); + context.put("connect-timeout", String.valueOf(2000L)); + context.put("request-timeout", String.valueOf(3000L)); + + return context; + } + + private Server createServer(AvroSourceProtocol protocol) + throws IllegalAccessException, InstantiationException { + + Server server = new NettyServer(new SpecificResponder( + AvroSourceProtocol.class, protocol), new InetSocketAddress( + hostname, port)); + + return server; + } + @Test public void testLifecycle() throws InterruptedException, InstantiationException, IllegalAccessException { @@ -384,85 +401,101 @@ public class TestAvroSink { } @Test - public void testSslProcess() throws InterruptedException, + public void testSslProcessTrustAllCerts() throws InterruptedException, EventDeliveryException, InstantiationException, IllegalAccessException { setUp(); - Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); - Server server = createSslServer(new MockAvroServer()); - - server.start(); - - Context context = new Context(); - context.put("hostname", hostname); - context.put("port", String.valueOf(port)); + Context context = createBaseContext(); context.put("ssl", String.valueOf(true)); context.put("trust-all-certs", String.valueOf(true)); - context.put("batch-size", String.valueOf(2)); - context.put("connect-timeout", String.valueOf(2000L)); - context.put("request-timeout", String.valueOf(3000L)); Configurables.configure(sink, context); - sink.start(); - Assert.assertTrue(LifecycleController.waitForOneOf(sink, - LifecycleState.START_OR_ERROR, 5000)); + doTestSslProcess(); + } - Transaction transaction = channel.getTransaction(); + @Test + public void testSslProcessWithComponentTruststore() throws InterruptedException, + EventDeliveryException, InstantiationException, IllegalAccessException { + setUp(); - transaction.begin(); - for (int i = 0; i < 10; i++) { - channel.put(event); - } - transaction.commit(); - transaction.close(); + Context context = createBaseContext(); + context.put("ssl", String.valueOf(true)); + context.put("truststore", "src/test/resources/truststore.jks"); + context.put("truststore-password", "password"); - for (int i = 0; i < 5; i++) { - Sink.Status status = sink.process(); - Assert.assertEquals(Sink.Status.READY, status); - } + Configurables.configure(sink, context); - Assert.assertEquals(Sink.Status.BACKOFF, sink.process()); + doTestSslProcess(); + } - sink.stop(); - Assert.assertTrue(LifecycleController.waitForOneOf(sink, - LifecycleState.STOP_OR_ERROR, 5000)); + @Test + public void testSslProcessWithComponentTruststoreNoPassword() throws InterruptedException, + EventDeliveryException, InstantiationException, IllegalAccessException { + setUp(); - server.close(); + Context context = createBaseContext(); + context.put("ssl", String.valueOf(true)); + context.put("truststore", "src/test/resources/truststore.jks"); + + Configurables.configure(sink, context); + + doTestSslProcess(); } @Test - public void testSslProcessWithTrustStore() throws InterruptedException, + public void testSslProcessWithGlobalTruststore() throws InterruptedException, EventDeliveryException, InstantiationException, IllegalAccessException { setUp(); - Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); - Server server = createSslServer(new MockAvroServer()); - server.start(); + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/truststore.jks"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); - Context context = new Context(); + Context context = createBaseContext(); + context.put("ssl", String.valueOf(true)); - context.put("hostname", hostname); - context.put("port", String.valueOf(port)); + Configurables.configure(sink, context); + + doTestSslProcess(); + + System.clearProperty("javax.net.ssl.trustStore"); + System.clearProperty("javax.net.ssl.trustStorePassword"); + } + + @Test + public void testSslProcessWithGlobalTruststoreNoPassword() throws InterruptedException, + EventDeliveryException, InstantiationException, IllegalAccessException { + setUp(); + + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/truststore.jks"); + + Context context = createBaseContext(); context.put("ssl", String.valueOf(true)); - context.put("truststore", "src/test/resources/truststore.jks"); - context.put("truststore-password", "password"); - context.put("batch-size", String.valueOf(2)); - context.put("connect-timeout", String.valueOf(2000L)); - context.put("request-timeout", String.valueOf(3000L)); Configurables.configure(sink, context); + doTestSslProcess(); + + System.clearProperty("javax.net.ssl.trustStore"); + } + + private void doTestSslProcess() throws InterruptedException, + EventDeliveryException, InstantiationException, IllegalAccessException { + Server server = createSslServer(new MockAvroServer()); + server.start(); + sink.start(); Assert.assertTrue(LifecycleController.waitForOneOf(sink, LifecycleState.START_OR_ERROR, 5000)); Transaction transaction = channel.getTransaction(); - transaction.begin(); + + Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); for (int i = 0; i < 10; i++) { channel.put(event); } + transaction.commit(); transaction.close(); @@ -480,16 +513,6 @@ public class TestAvroSink { server.close(); } - private Server createServer(AvroSourceProtocol protocol) - throws IllegalAccessException, InstantiationException { - - Server server = new NettyServer(new SpecificResponder( - AvroSourceProtocol.class, protocol), new InetSocketAddress( - hostname, port)); - - return server; - } - @Test public void testSslWithCompression() throws InterruptedException, EventDeliveryException, InstantiationException, IllegalAccessException { @@ -538,15 +561,9 @@ public class TestAvroSink { Event event = EventBuilder.withBody("Hello avro", Charset.forName("UTF8")); - context = new Context(); - - context.put("hostname", hostname); - context.put("port", String.valueOf(port)); + context = createBaseContext(); context.put("ssl", String.valueOf(true)); context.put("trust-all-certs", String.valueOf(true)); - context.put("batch-size", String.valueOf(2)); - context.put("connect-timeout", String.valueOf(2000L)); - context.put("request-timeout", String.valueOf(3000L)); context.put("compression-type", "deflate"); context.put("compression-level", Integer.toString(6)); @@ -591,57 +608,25 @@ public class TestAvroSink { @Test public void testSslSinkWithNonSslServer() throws InterruptedException, - EventDeliveryException, InstantiationException, IllegalAccessException { + InstantiationException, IllegalAccessException { setUp(); - Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); - Server server = createServer(new MockAvroServer()); + Server server = createServer(new MockAvroServer()); server.start(); - Context context = new Context(); - - context.put("hostname", hostname); - context.put("port", String.valueOf(port)); + Context context = createBaseContext(); context.put("ssl", String.valueOf(true)); context.put("trust-all-certs", String.valueOf(true)); - context.put("batch-size", String.valueOf(2)); - context.put("connect-timeout", String.valueOf(2000L)); - context.put("request-timeout", String.valueOf(3000L)); Configurables.configure(sink, context); - sink.start(); - Assert.assertTrue(LifecycleController.waitForOneOf(sink, - LifecycleState.START_OR_ERROR, 5000)); - - Transaction transaction = channel.getTransaction(); - - transaction.begin(); - for (int i = 0; i < 10; i++) { - channel.put(event); - } - transaction.commit(); - transaction.close(); - - boolean failed = false; - try { - for (int i = 0; i < 5; i++) { - sink.process(); - failed = true; - } - } catch (EventDeliveryException ex) { - logger.info("Correctly failed to send event", ex); - } - - - sink.stop(); - Assert.assertTrue(LifecycleController.waitForOneOf(sink, - LifecycleState.STOP_OR_ERROR, 5000)); + boolean failed = doRequestWhenFailureExpected(); server.close(); - if (failed) { - Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, that's wrong."); + if (!failed) { + Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, " + + "that's wrong."); } SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter"); @@ -649,61 +634,59 @@ public class TestAvroSink { } @Test - public void testSslSinkWithNonTrustedCert() - throws InterruptedException, EventDeliveryException, InstantiationException, - IllegalAccessException { + public void testSslSinkWithNonTrustedCert() throws InterruptedException, + InstantiationException, IllegalAccessException { setUp(); - Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); - Server server = createSslServer(new MockAvroServer()); + Server server = createSslServer(new MockAvroServer()); server.start(); - Context context = new Context(); - - context.put("hostname", hostname); - context.put("port", String.valueOf(port)); + Context context = createBaseContext(); context.put("ssl", String.valueOf(true)); - context.put("batch-size", String.valueOf(2)); - context.put("connect-timeout", String.valueOf(2000L)); - context.put("request-timeout", String.valueOf(3000L)); Configurables.configure(sink, context); + boolean failed = doRequestWhenFailureExpected(); + + server.close(); + + if (!failed) { + Assert.fail("SSL-enabled sink successfully connected to a server with an " + + "untrusted certificate when it should have failed"); + } + SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter"); + Assert.assertEquals(1, sinkCounter.getEventWriteFail()); + } + + private boolean doRequestWhenFailureExpected() + throws InterruptedException { sink.start(); Assert.assertTrue(LifecycleController.waitForOneOf(sink, LifecycleState.START_OR_ERROR, 5000)); Transaction transaction = channel.getTransaction(); - transaction.begin(); - for (int i = 0; i < 10; i++) { - channel.put(event); - } + + Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); + channel.put(event); + transaction.commit(); transaction.close(); - boolean failed = false; + boolean failed; try { - for (int i = 0; i < 5; i++) { - sink.process(); - failed = true; - } + sink.process(); + failed = false; } catch (EventDeliveryException ex) { logger.info("Correctly failed to send event", ex); + failed = true; } - sink.stop(); Assert.assertTrue(LifecycleController.waitForOneOf(sink, LifecycleState.STOP_OR_ERROR, 5000)); - server.close(); - - if (failed) { - Assert.fail("SSL-enabled sink successfully connected to a server with an untrusted certificate when it should have failed"); - } - SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter"); - Assert.assertEquals(1, sinkCounter.getEventWriteFail()); + return failed; } @Test http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java index 687c635..c573fe7 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java @@ -60,17 +60,24 @@ public class TestThriftSink { try (ServerSocket socket = new ServerSocket(0)) { port = socket.getLocalPort(); } + Context context = createBaseContext(); + context.put(ThriftRpcClient.CONFIG_PROTOCOL, ThriftRpcClient.COMPACT_PROTOCOL); + sink.setChannel(channel); + + Configurables.configure(sink, context); + Configurables.configure(channel, context); + } + + private Context createBaseContext() { Context context = new Context(); context.put("hostname", hostname); context.put("port", String.valueOf(port)); context.put("batch-size", String.valueOf(2)); + context.put("connect-timeout", String.valueOf(2000L)); context.put("request-timeout", String.valueOf(2000L)); - context.put(ThriftRpcClient.CONFIG_PROTOCOL, ThriftRpcClient.COMPACT_PROTOCOL); - sink.setChannel(channel); - Configurables.configure(sink, context); - Configurables.configure(channel, context); + return context; } @After @@ -146,7 +153,7 @@ public class TestThriftSink { sink.process(); // should throw another EventDeliveryException due to request timeout - delay.set(2500L); // because request-timeout = 3000 + delay.set(2500L); // because request-timeout = 2000 threw = false; try { sink.process(); @@ -201,32 +208,77 @@ public class TestThriftSink { } @Test - public void testSslProcess() throws Exception { - Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); - src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), port, - ThriftRpcClient.COMPACT_PROTOCOL, "src/test/resources/keystorefile.jks", - "password", KeyManagerFactory.getDefaultAlgorithm(), "JKS"); - Context context = new Context(); - context.put("hostname", hostname); - context.put("port", String.valueOf(port)); + public void testSslProcessWithComponentTruststore() throws Exception { + Context context = createBaseContext(); context.put("ssl", String.valueOf(true)); - context.put("batch-size", String.valueOf(2)); - context.put("connect-timeout", String.valueOf(2000L)); - context.put("request-timeout", String.valueOf(3000L)); context.put("truststore", "src/test/resources/truststorefile.jks"); context.put("truststore-password", "password"); - context.put("trustmanager-type", TrustManagerFactory.getDefaultAlgorithm()); Configurables.configure(sink, context); + + doTestSslProcess(); + } + + @Test + public void testSslProcessWithComponentTruststoreNoPassword() throws Exception { + Context context = createBaseContext(); + context.put("ssl", String.valueOf(true)); + context.put("truststore", "src/test/resources/truststorefile.jks"); + + Configurables.configure(sink, context); + + doTestSslProcess(); + } + + @Test + public void testSslProcessWithGlobalTruststore() throws Exception { + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/truststorefile.jks"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + + Context context = createBaseContext(); + context.put("ssl", String.valueOf(true)); + + Configurables.configure(sink, context); + + doTestSslProcess(); + + System.clearProperty("javax.net.ssl.trustStore"); + System.clearProperty("javax.net.ssl.trustStorePassword"); + } + + @Test + public void testSslProcessWithGlobalTruststoreNoPassword() throws Exception { + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/truststorefile.jks"); + + Context context = createBaseContext(); + context.put("ssl", String.valueOf(true)); + + Configurables.configure(sink, context); + + doTestSslProcess(); + + System.clearProperty("javax.net.ssl.trustStore"); + } + + private void doTestSslProcess() throws Exception { + src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), port, + ThriftRpcClient.COMPACT_PROTOCOL, "src/test/resources/keystorefile.jks", + "password", KeyManagerFactory.getDefaultAlgorithm(), "JKS"); + channel.start(); sink.start(); + Transaction transaction = channel.getTransaction(); transaction.begin(); + + Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); for (int i = 0; i < 11; i++) { channel.put(event); } + transaction.commit(); transaction.close(); + for (int i = 0; i < 6; i++) { Sink.Status status = sink.process(); Assert.assertEquals(Sink.Status.READY, status); @@ -241,48 +293,18 @@ public class TestThriftSink { @Test public void testSslSinkWithNonSslServer() throws Exception { - Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), port, ThriftRpcClient.COMPACT_PROTOCOL); - Context context = new Context(); - context.put("hostname", hostname); - context.put("port", String.valueOf(port)); + Context context = createBaseContext(); context.put("ssl", String.valueOf(true)); - context.put("batch-size", String.valueOf(2)); - context.put("connect-timeout", String.valueOf(2000L)); - context.put("request-timeout", String.valueOf(3000L)); context.put("truststore", "src/test/resources/truststorefile.jks"); context.put("truststore-password", "password"); - context.put("trustmanager-type", TrustManagerFactory.getDefaultAlgorithm()); Configurables.configure(sink, context); - channel.start(); - sink.start(); - Assert.assertTrue(LifecycleController.waitForOneOf(sink, - LifecycleState.START_OR_ERROR, 5000)); - Transaction transaction = channel.getTransaction(); - transaction.begin(); - for (int i = 0; i < 11; i++) { - channel.put(event); - } - transaction.commit(); - transaction.close(); - boolean failed = false; - try { - for (int i = 0; i < 6; i++) { - Sink.Status status = sink.process(); - failed = true; - } - } catch (EventDeliveryException ex) { - // This is correct - } - - sink.stop(); - Assert.assertTrue(LifecycleController.waitForOneOf(sink, - LifecycleState.STOP_OR_ERROR, 5000)); - if (failed) { + boolean failed = doRequestWhenFailureExpected(); + if (!failed) { Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, " + "that's wrong."); } @@ -290,48 +312,51 @@ public class TestThriftSink { @Test public void testSslSinkWithNonTrustedCert() throws Exception { - Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), port, ThriftRpcClient.COMPACT_PROTOCOL, "src/test/resources/keystorefile.jks", "password", KeyManagerFactory.getDefaultAlgorithm(), "JKS"); - Context context = new Context(); - context.put("hostname", hostname); - context.put("port", String.valueOf(port)); + Context context = createBaseContext(); context.put("ssl", String.valueOf(true)); - context.put("batch-size", String.valueOf(2)); - context.put("connect-timeout", String.valueOf(2000L)); - context.put("request-timeout", String.valueOf(3000L)); Configurables.configure(sink, context); + + boolean failed = doRequestWhenFailureExpected(); + if (!failed) { + Assert.fail("SSL-enabled sink successfully connected to a server with an " + + "untrusted certificate when it should have failed"); + } + } + + private boolean doRequestWhenFailureExpected() throws Exception { channel.start(); sink.start(); Assert.assertTrue(LifecycleController.waitForOneOf(sink, - LifecycleState.START_OR_ERROR, 5000)); + LifecycleState.START_OR_ERROR, 5000)); + Transaction transaction = channel.getTransaction(); transaction.begin(); - for (int i = 0; i < 11; i++) { - channel.put(event); - } + + Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); + channel.put(event); + transaction.commit(); transaction.close(); - boolean failed = false; + boolean failed; try { - for (int i = 0; i < 6; i++) { - Sink.Status status = sink.process(); - failed = true; - } + Sink.Status status = sink.process(); + failed = false; } catch (EventDeliveryException ex) { // This is correct + failed = true; } sink.stop(); Assert.assertTrue(LifecycleController.waitForOneOf(sink, - LifecycleState.STOP_OR_ERROR, 5000)); - if (failed) { - Assert.fail("SSL-enabled sink successfully connected to a server with an " + - "untrusted certificate when it should have failed"); - } + LifecycleState.STOP_OR_ERROR, 5000)); + + return failed; } + } http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java index 6f784ea..21e65ad 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java @@ -329,7 +329,7 @@ public class TestAvroSource { } @Test - public void testSslRequest() throws InterruptedException, IOException { + public void testSslRequestWithComponentKeystore() throws InterruptedException, IOException { Context context = new Context(); @@ -339,7 +339,34 @@ public class TestAvroSource { context.put("keystore", "src/test/resources/server.p12"); context.put("keystore-password", "password"); context.put("keystore-type", "PKCS12"); + + Configurables.configure(source, context); + + doSslRequest(); + } + + @Test + public void testSslRequestWithGlobalKeystore() throws InterruptedException, IOException { + + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.p12"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "PKCS12"); + + Context context = new Context(); + + context.put("port", String.valueOf(selectedPort = getFreePort())); + context.put("bind", "0.0.0.0"); + context.put("ssl", "true"); + Configurables.configure(source, context); + + doSslRequest(); + + System.clearProperty("javax.net.ssl.keyStore"); + System.clearProperty("javax.net.ssl.keyStorePassword"); + } + + private void doSslRequest() throws InterruptedException, IOException { source.start(); Assert @@ -350,7 +377,7 @@ public class TestAvroSource { AvroSourceProtocol client = SpecificRequestor.getClient( AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress( - selectedPort), new SSLChannelFactory())); + selectedPort), new SSLChannelFactory())); AvroFlumeEvent avroEvent = new AvroFlumeEvent(); http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java index d594276..610d6fc 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java @@ -102,24 +102,55 @@ public class TestThriftSource { } @Test - public void testAppendSSL() throws Exception { - Properties sslprops = (Properties)props.clone(); - sslprops.put("ssl", "true"); - sslprops.put("truststore", "src/test/resources/truststorefile.jks"); - sslprops.put("truststore-password", "password"); - sslprops.put("trustmanager-type", TrustManagerFactory.getDefaultAlgorithm()); - client = RpcClientFactory.getThriftInstance(sslprops); + public void testAppendSSLWithComponentKeystore() throws Exception { Context context = new Context(); channel.configure(context); configureSource(); + context.put(ThriftSource.CONFIG_BIND, "0.0.0.0"); context.put(ThriftSource.CONFIG_PORT, String.valueOf(port)); context.put("ssl", "true"); context.put("keystore", "src/test/resources/keystorefile.jks"); context.put("keystore-password", "password"); - context.put("keymanager-type", KeyManagerFactory.getDefaultAlgorithm()); + context.put("keystore-type", "JKS"); + + Configurables.configure(source, context); + + doAppendSSL(); + } + + @Test + public void testAppendSSLWithGlobalKeystore() throws Exception { + + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/keystorefile.jks"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "JKS"); + + Context context = new Context(); + channel.configure(context); + configureSource(); + + context.put(ThriftSource.CONFIG_BIND, "0.0.0.0"); + context.put(ThriftSource.CONFIG_PORT, String.valueOf(port)); + context.put("ssl", "true"); + Configurables.configure(source, context); + + doAppendSSL(); + + System.clearProperty("javax.net.ssl.keyStore"); + System.clearProperty("javax.net.ssl.keyStorePassword"); + System.clearProperty("javax.net.ssl.keyStoreType"); + } + + private void doAppendSSL() throws EventDeliveryException { + Properties sslprops = (Properties)props.clone(); + sslprops.put("ssl", "true"); + sslprops.put("truststore", "src/test/resources/truststorefile.jks"); + sslprops.put("truststore-password", "password"); + client = RpcClientFactory.getThriftInstance(sslprops); + source.start(); for (int i = 0; i < 30; i++) { client.append(EventBuilder.withBody(String.valueOf(i).getBytes())); http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java index 04eec24..39949e2 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java @@ -22,7 +22,6 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import junit.framework.Assert; import org.apache.flume.Channel; -import org.apache.flume.ChannelException; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; @@ -90,15 +89,18 @@ import static org.mockito.Mockito.doThrow; */ public class TestHTTPSource { - private static HTTPSource source; + private static HTTPSource httpSource; private static HTTPSource httpsSource; + private static HTTPSource httpsGlobalKeystoreSource; - private static Channel channel; + private static Channel httpChannel; private static Channel httpsChannel; - private static int selectedPort; - private static int sslPort; - HttpClient httpClient; - HttpPost postRequest; + private static Channel httpsGlobalKeystoreChannel; + private static int httpPort; + private static int httpsPort; + private static int httpsGlobalKeystorePort; + private HttpClient httpClient; + private HttpPost postRequest; private static int findFreePort() throws IOException { ServerSocket socket = new ServerSocket(0); @@ -107,17 +109,17 @@ public class TestHTTPSource { return port; } - private static Context getDefaultNonSecureContext(int selectedPort) throws IOException { + private static Context getDefaultNonSecureContext(int port) throws IOException { Context ctx = new Context(); ctx.put(HTTPSourceConfigurationConstants.CONFIG_BIND, "0.0.0.0"); - ctx.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(selectedPort)); + ctx.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(port)); ctx.put("QueuedThreadPool.MaxThreads", "100"); return ctx; } - private static Context getDefaultSecureContext(int sslPort) throws IOException { + private static Context getDefaultSecureContext(int port) throws IOException { Context sslContext = new Context(); - sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(sslPort)); + sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(port)); sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true"); sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, "password"); sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE, @@ -125,21 +127,42 @@ public class TestHTTPSource { return sslContext; } + private static Context getDefaultSecureContextGlobalKeystore(int port) throws IOException { + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/jettykeystore"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + + Context sslContext = new Context(); + sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(port)); + sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true"); + return sslContext; + } + @BeforeClass public static void setUpClass() throws Exception { - source = new HTTPSource(); - channel = new MemoryChannel(); - selectedPort = findFreePort(); - configureSourceAndChannel(source, channel, getDefaultNonSecureContext(selectedPort)); - channel.start(); - source.start(); + httpSource = new HTTPSource(); + httpChannel = new MemoryChannel(); + httpPort = findFreePort(); + configureSourceAndChannel(httpSource, httpChannel, getDefaultNonSecureContext(httpPort)); + httpChannel.start(); + httpSource.start(); httpsSource = new HTTPSource(); httpsChannel = new MemoryChannel(); - sslPort = findFreePort(); - configureSourceAndChannel(httpsSource, httpsChannel, getDefaultSecureContext(sslPort)); + httpsPort = findFreePort(); + configureSourceAndChannel(httpsSource, httpsChannel, getDefaultSecureContext(httpsPort)); httpsChannel.start(); httpsSource.start(); + + httpsGlobalKeystoreSource = new HTTPSource(); + httpsGlobalKeystoreChannel = new MemoryChannel(); + httpsGlobalKeystorePort = findFreePort(); + configureSourceAndChannel(httpsGlobalKeystoreSource, httpsGlobalKeystoreChannel, + getDefaultSecureContextGlobalKeystore(httpsGlobalKeystorePort)); + httpsGlobalKeystoreChannel.start(); + httpsGlobalKeystoreSource.start(); + + System.clearProperty("javax.net.ssl.keyStore"); + System.clearProperty("javax.net.ssl.keyStorePassword"); } private static void configureSourceAndChannel( @@ -158,17 +181,19 @@ public class TestHTTPSource { @AfterClass public static void tearDownClass() throws Exception { - source.stop(); - channel.stop(); + httpSource.stop(); + httpChannel.stop(); httpsSource.stop(); httpsChannel.stop(); + httpsGlobalKeystoreSource.stop(); + httpsGlobalKeystoreChannel.stop(); } @Before public void setUp() { HttpClientBuilder builder = HttpClientBuilder.create(); httpClient = builder.build(); - postRequest = new HttpPost("http://0.0.0.0:" + selectedPort); + postRequest = new HttpPost("http://0.0.0.0:" + httpPort); } @Test @@ -185,14 +210,14 @@ public class TestHTTPSource { Assert.assertEquals(HttpServletResponse.SC_OK, response.getStatusLine().getStatusCode()); - Transaction tx = channel.getTransaction(); + Transaction tx = httpChannel.getTransaction(); tx.begin(); - Event e = channel.take(); + Event e = httpChannel.take(); Assert.assertNotNull(e); Assert.assertEquals("b", e.getHeaders().get("a")); Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8")); - e = channel.take(); + e = httpChannel.take(); Assert.assertNotNull(e); Assert.assertEquals("f", e.getHeaders().get("e")); Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8")); @@ -202,12 +227,12 @@ public class TestHTTPSource { @Test public void testTrace() throws Exception { - doTestForbidden(new HttpTrace("http://0.0.0.0:" + selectedPort)); + doTestForbidden(new HttpTrace("http://0.0.0.0:" + httpPort)); } @Test public void testOptions() throws Exception { - doTestForbidden(new HttpOptions("http://0.0.0.0:" + selectedPort)); + doTestForbidden(new HttpOptions("http://0.0.0.0:" + httpPort)); } private void doTestForbidden(HttpRequestBase request) throws Exception { @@ -228,14 +253,14 @@ public class TestHTTPSource { Assert.assertEquals(HttpServletResponse.SC_OK, response.getStatusLine().getStatusCode()); - Transaction tx = channel.getTransaction(); + Transaction tx = httpChannel.getTransaction(); tx.begin(); - Event e = channel.take(); + Event e = httpChannel.take(); Assert.assertNotNull(e); Assert.assertEquals("b", e.getHeaders().get("a")); Assert.assertEquals("random_body", new String(e.getBody(), "UTF-16")); - e = channel.take(); + e = httpChannel.take(); Assert.assertNotNull(e); Assert.assertEquals("f", e.getHeaders().get("e")); Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-16")); @@ -253,7 +278,7 @@ public class TestHTTPSource { Assert.assertEquals(HttpServletResponse.SC_BAD_REQUEST, response.getStatusLine().getStatusCode()); - SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter"); + SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter"); Assert.assertEquals(1, sc.getEventReadFail()); } @@ -277,12 +302,12 @@ public class TestHTTPSource { public void testCounterGenericFail() throws Exception { ChannelProcessor cp = Mockito.mock(ChannelProcessor.class); doThrow(new RuntimeException("dummy")).when(cp).processEventBatch(anyListOf(Event.class)); - ChannelProcessor oldCp = source.getChannelProcessor(); - source.setChannelProcessor(cp); + ChannelProcessor oldCp = httpSource.getChannelProcessor(); + httpSource.setChannelProcessor(cp); testBatchWithVariousEncoding("UTF-8"); - SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter"); + SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter"); Assert.assertEquals(1, sc.getGenericProcessingFail()); - source.setChannelProcessor(oldCp); + httpSource.setChannelProcessor(oldCp); } @Test @@ -293,9 +318,9 @@ public class TestHTTPSource { postRequest.setEntity(input); httpClient.execute(postRequest); - Transaction tx = channel.getTransaction(); + Transaction tx = httpChannel.getTransaction(); tx.begin(); - Event e = channel.take(); + Event e = httpChannel.take(); Assert.assertNotNull(e); Assert.assertEquals("b", e.getHeaders().get("a")); Assert.assertEquals("random_body", new String(e.getBody(),"UTF-8")); @@ -323,9 +348,9 @@ public class TestHTTPSource { Assert.assertTrue(resp.getHeaders("X-Powered-By").length == 0); Assert.assertTrue(resp.getHeaders("Server").length == 1); - Transaction tx = channel.getTransaction(); + Transaction tx = httpChannel.getTransaction(); tx.begin(); - Event e = channel.take(); + Event e = httpChannel.take(); Assert.assertNotNull(e); tx.commit(); tx.close(); @@ -375,7 +400,7 @@ public class TestHTTPSource { newPostRequest = new HttpPost("http://0.0.0.0:" + newPort); try { - doTestHttps(null, newPort); + doTestHttps(null, newPort, httpsChannel); //We are testing that this fails because we've deliberately configured the wrong protocols Assert.assertTrue(false); } catch (AssertionError ex) { @@ -390,22 +415,22 @@ public class TestHTTPSource { HttpResponse response = putWithEncoding("UTF-8", 150).response; Assert.assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.getStatusLine().getStatusCode()); - SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter"); + SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter"); Assert.assertEquals(1, sc.getChannelWriteFail()); } @Test public void testFail() throws Exception { HTTPSourceHandler handler = field("handler").ofType(HTTPSourceHandler.class) - .in(source).get(); + .in(httpSource).get(); //Cause an exception in the source - this is equivalent to any exception //thrown by the handler since the handler is called inside a try-catch - field("handler").ofType(HTTPSourceHandler.class).in(source).set(null); + field("handler").ofType(HTTPSourceHandler.class).in(httpSource).set(null); HttpResponse response = putWithEncoding("UTF-8", 1).response; Assert.assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, response.getStatusLine().getStatusCode()); //Set the original handler back so tests don't fail after this runs. - field("handler").ofType(HTTPSourceHandler.class).in(source).set(handler); + field("handler").ofType(HTTPSourceHandler.class).in(httpSource).set(handler); } @Test @@ -458,15 +483,20 @@ public class TestHTTPSource { @Test public void testHttps() throws Exception { - doTestHttps(null, sslPort); + doTestHttps(null, httpsPort, httpsChannel); } @Test (expected = javax.net.ssl.SSLHandshakeException.class) public void testHttpsSSLv3() throws Exception { - doTestHttps("SSLv3", sslPort); + doTestHttps("SSLv3", httpsPort, httpsChannel); + } + + @Test + public void testHttpsGlobalKeystore() throws Exception { + doTestHttps(null, httpsGlobalKeystorePort, httpsGlobalKeystoreChannel); } - public void doTestHttps(String protocol, int port) throws Exception { + private void doTestHttps(String protocol, int port, Channel channel) throws Exception { Type listType = new TypeToken>() { }.getType(); List events = new ArrayList(); @@ -539,10 +569,10 @@ public class TestHTTPSource { int statusCode = httpsURLConnection.getResponseCode(); Assert.assertEquals(200, statusCode); - transaction = httpsChannel.getTransaction(); + transaction = channel.getTransaction(); transaction.begin(); for (int i = 0; i < 10; i++) { - Event e = httpsChannel.take(); + Event e = channel.take(); Assert.assertNotNull(e); Assert.assertEquals(String.valueOf(i), e.getHeaders().get("MsgNum")); } @@ -577,7 +607,7 @@ public class TestHTTPSource { String json = gson.toJson(events, listType); HttpURLConnection httpURLConnection = null; try { - URL url = new URL("http://0.0.0.0:" + sslPort); + URL url = new URL("http://0.0.0.0:" + httpsPort); httpURLConnection = (HttpURLConnection) url.openConnection(); httpURLConnection.setDoInput(true); httpURLConnection.setDoOutput(true); @@ -594,12 +624,12 @@ public class TestHTTPSource { } private void takeWithEncoding(String encoding, int n, List events) throws Exception { - Transaction tx = channel.getTransaction(); + Transaction tx = httpChannel.getTransaction(); tx.begin(); Event e = null; int i = 0; while (true) { - e = channel.take(); + e = httpChannel.take(); if (e == null) { break; } http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 3b0c183..16adc79 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -749,6 +749,102 @@ the selector will attempt to write the events to the optional channels. Any failures are simply ignored in that case. +SSL/TLS support +--------------- + +Several Flume components support the SSL/TLS protocols in order to communicate with other systems +securely. + +=============== ====================== +Component SSL server or client +=============== ====================== +Avro Source server +Avro Sink client +Thrift Source server +Thrift Sink client +Kafka Source client +Kafka Channel client +Kafka Sink client +HTTP Source server +JMS Source client +=============== ====================== + +The SSL compatible components have several configuration parameters to set up SSL, like +enable SSL flag, keystore / truststore parameters (location, password, type) and additional +SSL parameters (eg. disabled protocols). + +Enabling SSL for a component is always specified at component level in the agent configuration file. +So some components may be configured to use SSL while others not (even with the same component type). + +The keystore / truststore setup can be specified at component level or globally. + +In case of the component level setup, the keystore / truststore is configured in the agent +configuration file through component specific parameters. The advantage of this method is that the +components can use different keystores (if this would be needed). The disadvantage is that the +keystore parameters must be copied for each component in the agent configuration file. +The component level setup is optional, but if it is defined, it has higher precedence than +the global parameters. + +With the global setup, it is enough to define the keystore / truststore parameters once +and use the same settings for all components, which means less and more centralized configuration. + +The global setup can be configured either through system properties or through environment variables. + +================================== =============================== ================================== +System property Environment variable Description +================================== =============================== ================================== +javax.net.ssl.keyStore FLUME_SSL_KEYSTORE_PATH Keystore location +javax.net.ssl.keyStorePassword FLUME_SSL_KEYSTORE_PASSWORD Keystore password +javax.net.ssl.keyStoreType FLUME_SSL_KEYSTORE_TYPE Keystore type (by default JKS) +javax.net.ssl.trustStore FLUME_SSL_TRUSTSTORE_PATH Truststore location +javax.net.ssl.trustStorePassword FLUME_SSL_TRUSTSTORE_PASSWORD Truststore password +javax.net.ssl.trustStoreType FLUME_SSL_TRUSTSTORE_TYPE Truststore type (by default JKS) +================================== =============================== ================================== + +The SSL system properties can either be passed on the command line or by setting the ``JAVA_OPTS`` +environment variable in *conf/flume-env.sh*. (Although, using the command line is inadvisable because +the commands including the passwords will be saved to the command history.) + +.. code-block:: properties + + export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks" + export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password" + +Flume uses the system properties defined in JSSE (Java Secure Socket Extension), so this is +a standard way for setting up SSL. On the other hand, specifying passwords in system properties +means that the passwords can be seen in the process list. For cases where it is not acceptable, +it is also be possible to define the parameters in environment variables. Flume initializes +the JSSE system properties from the corresponding environment variables internally in this case. + +The SSL environment variables can either be set in the shell environment before +starting Flume or in *conf/flume-env.sh*. (Although, using the command line is inadvisable because +the commands including the passwords will be saved to the command history.) + +.. code-block:: properties + + export FLUME_SSL_KEYSTORE_PATH=/path/to/keystore.jks + export FLUME_SSL_KEYSTORE_PASSWORD=password + +**Please note:** + +* SSL must be enabled at component level. Specifying the global SSL parameters alone will not + have any effect. +* If the global SSL parameters are specified at multiple levels, the priority is the + following (from higher to lower): + + * component parameters in agent config + * system properties + * environment variables + +* If SSL is enabled for a component, but the SSL parameters are not specified in any of the ways + described above, then + + * in case of keystores: configuration error + * in case of truststores: the default truststore will be used (``jssecacerts`` / ``cacerts`` in Oracle JDK) +* The trustore password is optional in all cases. If not specified, then no integrity check will be + performed on the truststore when it is opened by the JDK. + + Flume Sources ------------- @@ -773,10 +869,19 @@ selector.* interceptors -- Space-separated list of interceptors interceptors.* compression-type none This can be "none" or "deflate". The compression-type must match the compression-type of matching AvroSource -ssl false Set this to true to enable SSL encryption. You must also specify a "keystore" and a "keystore-password". -keystore -- This is the path to a Java keystore file. Required for SSL. -keystore-password -- The password for the Java keystore. Required for SSL. +ssl false Set this to true to enable SSL encryption. If SSL is enabled, + you must also specify a "keystore" and a "keystore-password", + either through component level parameters (see below) + or as global SSL parameters (see `SSL/TLS support`_ section). +keystore -- This is the path to a Java keystore file. + If not specified here, then the global keystore will be used + (if defined, otherwise configuration error). +keystore-password -- The password for the Java keystore. + If not specified here, then the global keystore password will be used + (if defined, otherwise configuration error). keystore-type JKS The type of the Java keystore. This can be "JKS" or "PKCS12". + If not specified here, then the global keystore type will be used + (if defined, otherwise the default is JKS). exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. ipFilter false Set this to true to enable ipFiltering for netty ipFilterRules -- Define N netty ipFilter pattern rules with this config. @@ -819,9 +924,9 @@ agent-principal and agent-keytab are the properties used by the Thrift source to authenticate to the kerberos KDC. Required properties are in **bold**. -================== =========== =================================================== +================== =========== ================================================================== Property Name Default Description -================== =========== =================================================== +================== =========== ================================================================== **channels** -- **type** -- The component type name, needs to be ``thrift`` **bind** -- hostname or IP address to listen on @@ -831,15 +936,24 @@ selector.type selector.* interceptors -- Space separated list of interceptors interceptors.* -ssl false Set this to true to enable SSL encryption. You must also specify a "keystore" and a "keystore-password". -keystore -- This is the path to a Java keystore file. Required for SSL. -keystore-password -- The password for the Java keystore. Required for SSL. +ssl false Set this to true to enable SSL encryption. If SSL is enabled, + you must also specify a "keystore" and a "keystore-password", + either through component level parameters (see below) + or as global SSL parameters (see `SSL/TLS support`_ section) +keystore -- This is the path to a Java keystore file. + If not specified here, then the global keystore will be used + (if defined, otherwise configuration error). +keystore-password -- The password for the Java keystore. + If not specified here, then the global keystore password will be used + (if defined, otherwise configuration error). keystore-type JKS The type of the Java keystore. This can be "JKS" or "PKCS12". + If not specified here, then the global keystore type will be used + (if defined, otherwise the default is JKS). exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. kerberos false Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC. agent-principal -- The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC. agent-keytab —- The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC. -================== =========== =================================================== +================== =========== ================================================================== Example for agent named a1: @@ -963,8 +1077,8 @@ durableSubscriptionName -- Name used to identify the durable subsc ========================= =========== ============================================================== -Converter -''''''''' +JMS message converter +''''''''''''''''''''' The JMS source allows pluggable converters, though it's likely the default converter will work for most purposes. The default converter is able to convert Bytes, Text, and Object messages to FlumeEvents. In all cases, the properties in the message are added as headers to the @@ -998,8 +1112,8 @@ Example for agent named a1: a1.sources.r1.destinationType = QUEUE -SSL/TLS support -''''''''''''''' +SSL and JMS Source +'''''''''''''''''' JMS client implementations typically support to configure SSL/TLS via some Java system properties defined by JSSE (Java Secure Socket Extension). Specifying these system properties for Flume's JVM, JMS Source (or more precisely the @@ -1007,9 +1121,6 @@ JMS client implementation used by the JMS Source) can connect to the JMS server server has also been set up to use SSL). It should work with any JMS provider and has been tested with ActiveMQ, IBM MQ and Oracle WebLogic. -The JSSE Java system properties can either be passed on the command line or by setting the ``JAVA_OPTS`` environment -variable in *conf/flume-env.sh* (the examples below show the second approach). - The following sections describe the SSL configuration steps needed on the Flume side only. You can find more detailed descriptions about the server side setup of the different JMS providers and also full working configuration examples on Flume Wiki. @@ -1017,13 +1128,8 @@ Flume Wiki. **SSL transport / server authentication:** If the JMS server uses self-signed certificate or its certificate is signed by a non-trusted CA (eg. the company's own -CA), then a truststore (containing the right certificate) needs to be set up and passed to Flume via the following JSSE -Java system properties: - -.. code-block:: properties - - export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.trustStore=/path/to/truststore.jks" - export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.trustStorePassword=password" +CA), then a truststore (containing the right certificate) needs to be set up and passed to Flume. It can be done via +the global SSL parameters. For more details about the global SSL setup, see the `SSL/TLS support`_ section. Some JMS providers require SSL specific JNDI Initial Context Factory and/or Provider URL settings when using SSL (eg. ActiveMQ uses ssl:// URL prefix instead of tcp://). @@ -1035,13 +1141,8 @@ config file. JMS Source can authenticate to the JMS server through client certificate authentication instead of the usual user/password login (when SSL is used and the JMS server is configured to accept this kind of authentication). -The keystore containing Flume's key used for the authentication needs to be configured via the following JSSE Java -system properties (similarly to the truststore properties above): - -.. code-block:: properties - - export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks" - export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password" +The keystore containing Flume's key used for the authentication needs to be configured via the global SSL parameters +again. For more details about the global SSL setup, see the `SSL/TLS support`_ section. The keystore should contain only one key (if multiple keys are present, then the first one will be used). The key password must be the same as the keystore password. @@ -1049,6 +1150,13 @@ The key password must be the same as the keystore password. In case of client certificate authentication, it is not needed to specify the ``userName`` / ``passwordFile`` properties for the JMS Source in the Flume agent config file. +**Please note:** + +There are no component level configuration parameters for JMS Source unlike in case of other components. +No enable SSL flag either. +SSL setup is controlled by JNDI/Provider URL settings (ultimately the JMS server settings) and by the presence / absence +of the truststore / keystore. + Spooling Directory Source ~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -1440,9 +1548,12 @@ Example configuration with server side authentication and data encryption. a1.sources.source1.kafka.topics = mytopic a1.sources.source1.kafka.consumer.group.id = flume-consumer a1.sources.source1.kafka.consumer.security.protocol = SSL + # optional, the global truststore can be used alternatively a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks a1.sources.source1.kafka.consumer.ssl.truststore.password= +Specyfing the truststore is optional here, the global truststore can be used instead. +For more details about the global SSL setup, see the `SSL/TLS support`_ section. Note: By default the property ``ssl.endpoint.identification.algorithm`` is not defined, so hostname verification is not performed. @@ -1458,13 +1569,15 @@ against one of the following two fields: #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 #) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6 -If client side authentication is also required then additionally the following should be added to Flume agent configuration. +If client side authentication is also required then additionally the following needs to be added to Flume agent +configuration or the global SSL setup can be used (see `SSL/TLS support`_ section). Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers. .. code-block:: properties + # optional, the global keystore can be used alternatively a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks a1.sources.source1.kafka.consumer.ssl.keystore.password= @@ -1511,6 +1624,7 @@ Example secure configuration using SASL_SSL: a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka + # optional, the global truststore can be used alternatively a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks a1.sources.source1.kafka.consumer.ssl.truststore.password= @@ -1801,8 +1915,14 @@ interceptors -- Space-separa interceptors.* enableSSL false Set the property true, to enable SSL. *HTTP Source does not support SSLv3.* excludeProtocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 is always excluded. -keystore Location of the keystore includng keystore file name -keystorePassword Keystore password +keystore Location of the keystore including keystore file name. + If SSL is enabled but the keystore is not specified here, + then the global keystore will be used + (if defined, otherwise configuration error). +keystorePassword Keystore password. + If SSL is enabled but the keystore password is not specified here, + then the global keystore password will be used + (if defined, otherwise configuration error). QueuedThreadPool.* Jetty specific settings to be set on org.eclipse.jetty.util.thread.QueuedThreadPool. N.B. QueuedThreadPool will only be used if at least one property of this class is set. HttpConfiguration.* Jetty specific settings to be set on org.eclipse.jetty.server.HttpConfiguration @@ -2372,9 +2492,9 @@ compression-type none compression-level 6 The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression ssl false Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a "truststore", "truststore-password", "truststore-type", and specify whether to "trust-all-certs". trust-all-certs false If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and "listen in" on the encrypted connection. -truststore -- The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source's SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used. -truststore-password -- The password for the specified truststore. -truststore-type JKS The type of the Java truststore. This can be "JKS" or other supported Java truststore type. +truststore -- The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source's SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used. +truststore-password -- The password for the truststore. If not specified, then the global keystore password will be used (if defined). +truststore-type JKS The type of the Java truststore. This can be "JKS" or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS). exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory. ========================== ===================================================== =========================================================================================== @@ -2417,9 +2537,9 @@ connect-timeout 20000 Amount of time (ms) to allow for the first request-timeout 20000 Amount of time (ms) to allow for requests after the first. connection-reset-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. ssl false Set to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a "truststore", "truststore-password" and "truststore-type" -truststore -- The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source's SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used. -truststore-password -- The password for the specified truststore. -truststore-type JKS The type of the Java truststore. This can be "JKS" or other supported Java truststore type. +truststore -- The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source's SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used. +truststore-password -- The password for the truststore. If not specified, then the global keystore password will be used (if defined). +truststore-type JKS The type of the Java truststore. This can be "JKS" or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS). exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude kerberos false Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for successful authentication and communication to a kerberos enabled Thrift Source. client-principal —- The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC. @@ -3002,9 +3122,12 @@ Example configuration with server side authentication and data encryption. a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sinks.sink1.kafka.topic = mytopic a1.sinks.sink1.kafka.producer.security.protocol = SSL + # optional, the global truststore can be used alternatively a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.sinks.sink1.kafka.producer.ssl.truststore.password = +Specyfing the truststore is optional here, the global truststore can be used instead. +For more details about the global SSL setup, see the `SSL/TLS support`_ section. Note: By default the property ``ssl.endpoint.identification.algorithm`` is not defined, so hostname verification is not performed. @@ -3020,13 +3143,15 @@ against one of the following two fields: #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 #) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6 -If client side authentication is also required then additionally the following should be added to Flume agent configuration. +If client side authentication is also required then additionally the following needs to be added to Flume agent +configuration or the global SSL setup can be used (see `SSL/TLS support`_ section). Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers. .. code-block:: properties + # optional, the global keystore can be used alternatively a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks a1.sinks.sink1.kafka.producer.ssl.keystore.password = @@ -3072,6 +3197,7 @@ Example secure configuration using SASL_SSL: a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka + # optional, the global truststore can be used alternatively a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.sinks.sink1.kafka.producer.ssl.truststore.password = @@ -3401,12 +3527,16 @@ Example configuration with server side authentication and data encryption. a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer a1.channels.channel1.kafka.producer.security.protocol = SSL + # optional, the global truststore can be used alternatively a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.producer.ssl.truststore.password = a1.channels.channel1.kafka.consumer.security.protocol = SSL + # optional, the global truststore can be used alternatively a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.consumer.ssl.truststore.password = +Specyfing the truststore is optional here, the global truststore can be used instead. +For more details about the global SSL setup, see the `SSL/TLS support`_ section. Note: By default the property ``ssl.endpoint.identification.algorithm`` is not defined, so hostname verification is not performed. @@ -3423,15 +3553,18 @@ against one of the following two fields: #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 #) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6 -If client side authentication is also required then additionally the following should be added to Flume agent configuration. +If client side authentication is also required then additionally the following needs to be added to Flume agent +configuration or the global SSL setup can be used (see `SSL/TLS support`_ section). Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers. .. code-block:: properties + # optional, the global keystore can be used alternatively a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks a1.channels.channel1.kafka.producer.ssl.keystore.password = + # optional, the global keystore can be used alternatively a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks a1.channels.channel1.kafka.consumer.ssl.keystore.password = @@ -3482,11 +3615,13 @@ Example secure configuration using SASL_SSL: a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka + # optional, the global truststore can be used alternatively a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.producer.ssl.truststore.password = a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka + # optional, the global truststore can be used alternatively a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.consumer.ssl.truststore.password = http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-node/src/main/java/org/apache/flume/node/Application.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index 7111f60..406bb7d 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java @@ -41,6 +41,7 @@ import org.apache.flume.lifecycle.LifecycleAware; import org.apache.flume.lifecycle.LifecycleState; import org.apache.flume.lifecycle.LifecycleSupervisor; import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy; +import org.apache.flume.util.SSLUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -251,8 +252,7 @@ public class Application { public static void main(String[] args) { try { - - boolean isZkConfigured = false; + SSLUtil.initGlobalSSLParameters(); Options options = new Options(); @@ -294,10 +294,12 @@ public class Application { String agentName = commandLine.getOptionValue('n'); boolean reload = !commandLine.hasOption("no-reload-conf"); + boolean isZkConfigured = false; if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) { isZkConfigured = true; } - Application application = null; + + Application application; if (isZkConfigured) { // get options String zkConnectionStr = commandLine.getOptionValue('z'); http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java index b61eb79..9bcdf51 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java @@ -65,6 +65,7 @@ import org.apache.flume.FlumeException; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.flume.source.avro.AvroSourceProtocol; import org.apache.flume.source.avro.Status; +import org.apache.flume.util.SSLUtil; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; @@ -586,11 +587,13 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient { trustAllCerts = Boolean.parseBoolean(properties.getProperty( RpcClientConfigurationConstants.CONFIG_TRUST_ALL_CERTS)); truststore = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_TRUSTSTORE); + RpcClientConfigurationConstants.CONFIG_TRUSTSTORE, SSLUtil.getGlobalTruststorePath()); truststorePassword = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD); + RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD, + SSLUtil.getGlobalTruststorePassword()); truststoreType = properties.getProperty( - RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS"); + RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, + SSLUtil.getGlobalTruststoreType("JKS")); String excludeProtocolsStr = properties.getProperty( RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS); if (excludeProtocolsStr == null) { @@ -716,12 +719,10 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient { KeyStore keystore = null; if (truststore != null) { - if (truststorePassword == null) { - throw new NullPointerException("truststore password is null"); - } InputStream truststoreStream = new FileInputStream(truststore); keystore = KeyStore.getInstance(truststoreType); - keystore.load(truststoreStream, truststorePassword.toCharArray()); + keystore.load(truststoreStream, + truststorePassword != null ? truststorePassword.toCharArray() : null); } TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");