Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 268192004A1 for ; Wed, 9 Aug 2017 15:34:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 24F3516923F; Wed, 9 Aug 2017 13:34:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 46E7A169243 for ; Wed, 9 Aug 2017 15:34:45 +0200 (CEST) Received: (qmail 45643 invoked by uid 500); 9 Aug 2017 13:34:39 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 45605 invoked by uid 99); 9 Aug 2017 13:34:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Aug 2017 13:34:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 00081E10F8; Wed, 9 Aug 2017 13:34:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: jbonofre@apache.org To: commits@beam.apache.org Date: Wed, 09 Aug 2017 13:34:38 -0000 Message-Id: <00d203959891435085836524c69f7c50@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] beam git commit: [BEAM-1274] Add SSL mutual authentication support archived-at: Wed, 09 Aug 2017 13:34:46 -0000 Repository: beam Updated Branches: refs/heads/master db4b0939a -> 04f5bc6f8 [BEAM-1274] Add SSL mutual authentication support Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f48bb4be Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f48bb4be Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f48bb4be Branch: refs/heads/master Commit: f48bb4be1d3bb23f3cc978c4c25cf43842639296 Parents: aadbe36 Author: Jean-Baptiste Onofré Authored: Mon Jul 24 17:53:15 2017 +0200 Committer: Jean-Baptiste Onofré Committed: Mon Aug 7 07:28:14 2017 +0200 ---------------------------------------------------------------------- .../sdk/io/elasticsearch/ElasticsearchIO.java | 69 +++++++++++++++++++- 1 file changed, 67 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f48bb4be/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 5046888..2cd3bcd 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -25,10 +25,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; + +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.Serializable; -import java.net.MalformedURLException; import java.net.URL; +import java.security.KeyStore; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -39,6 +43,8 @@ import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; + import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -60,7 +66,9 @@ import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.message.BasicHeader; +import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.ssl.SSLContexts; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; @@ -155,6 +163,12 @@ public class ElasticsearchIO { @Nullable abstract String getPassword(); + @Nullable + abstract String getKeystorePath(); + + @Nullable + abstract String getKeystorePassword(); + abstract String getIndex(); abstract String getType(); @@ -169,6 +183,10 @@ public class ElasticsearchIO { abstract Builder setPassword(String password); + abstract Builder setKeystorePath(String keystorePath); + + abstract Builder setKeystorePassword(String password); + abstract Builder setIndex(String index); abstract Builder setType(String type); @@ -239,6 +257,32 @@ public class ElasticsearchIO { return builder().setPassword(password).build(); } + /** + * If Elasticsearch uses SSL with mutual authentication (via shield), + * provide the keystore containing the client key. + * + * @param keystorePath the location of the keystore containing the client key. + * @return the {@link ConnectionConfiguration} object with keystore path set. + */ + public ConnectionConfiguration withKeystorePath(String keystorePath) { + checkArgument(keystorePath != null, "ConnectionConfiguration.create()" + + ".withKeystorePath(keystorePath) called with null keystorePath"); + return builder().setKeystorePath(keystorePath).build(); + } + + /** + * If Elasticsearch uses SSL with mutual authentication (via shield), + * provide the password to open the client keystore. + * + * @param keystorePassword the password of the client keystore. + * @return the {@link ConnectionConfiguration} object with keystore password set. + */ + public ConnectionConfiguration withKeystorePassword(String keystorePassword) { + checkArgument(keystorePassword != null, "ConnectionConfiguration.create()" + + ".withKeystorePassword(keystorePassword) called with null keystorePassword"); + return builder().setKeystorePassword(keystorePassword).build(); + } + private void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("address", getAddresses().toString())); builder.add(DisplayData.item("index", getIndex())); @@ -246,7 +290,7 @@ public class ElasticsearchIO { builder.addIfNotNull(DisplayData.item("username", getUsername())); } - RestClient createClient() throws MalformedURLException { + RestClient createClient() throws IOException { HttpHost[] hosts = new HttpHost[getAddresses().size()]; int i = 0; for (String address : getAddresses()) { @@ -267,6 +311,27 @@ public class ElasticsearchIO { } }); } + if (getKeystorePath() != null) { + try { + KeyStore keyStore = KeyStore.getInstance("jks"); + try (InputStream is = new FileInputStream(new File(getKeystorePath()))) { + keyStore.load(is, getKeystorePassword().toCharArray()); + } + final SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(keyStore, null).build(); + final SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext); + restClientBuilder.setHttpClientConfigCallback( + new RestClientBuilder.HttpClientConfigCallback() { + @Override + public HttpAsyncClientBuilder customizeHttpClient( + HttpAsyncClientBuilder httpClientBuilder) { + return httpClientBuilder.setSSLContext(sslContext).setSSLStrategy(sessionStrategy); + } + }); + } catch (Exception e) { + throw new IOException("Can't load the client certificate from the keystore", e); + } + } return restClientBuilder.build(); } }