ratis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject incubator-ratis git commit: RATIS-246. Support secure gRPC endpoint with mTLS in Ratis. Contributed by Xiaoyu Yao
Date Sat, 15 Dec 2018 07:12:34 GMT
Repository: incubator-ratis
Updated Branches:
  refs/heads/master 76e89911a -> c038d2e21


RATIS-246. Support secure gRPC endpoint with mTLS in Ratis.  Contributed by Xiaoyu Yao


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c038d2e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c038d2e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c038d2e2

Branch: refs/heads/master
Commit: c038d2e215620d312407e14843949f6873cd7b04
Parents: 76e8991
Author: Tsz Wo Nicholas Sze <szetszwo@apache.org>
Authored: Sat Dec 15 15:11:29 2018 +0800
Committer: Tsz Wo Nicholas Sze <szetszwo@apache.org>
Committed: Sat Dec 15 15:11:29 2018 +0800

----------------------------------------------------------------------
 .../org/apache/ratis/grpc/GrpcConfigKeys.java   | 50 ++++++++++++++
 .../java/org/apache/ratis/grpc/GrpcFactory.java | 23 ++++++-
 .../org/apache/ratis/grpc/GrpcTlsConfig.java    | 68 ++++++++++++++++++++
 .../grpc/client/GrpcClientProtocolClient.java   | 32 +++++++--
 .../grpc/client/GrpcClientProtocolProxy.java    |  5 +-
 .../apache/ratis/grpc/client/GrpcClientRpc.java | 12 +++-
 .../ratis/grpc/client/GrpcClientStreamer.java   |  7 +-
 .../ratis/grpc/client/GrpcOutputStream.java     |  5 +-
 .../grpc/server/GrpcServerProtocolClient.java   | 30 +++++++--
 .../apache/ratis/grpc/server/GrpcService.java   | 55 +++++++++++++---
 .../org/apache/ratis/grpc/TestRaftStream.java   |  8 +--
 11 files changed, 261 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index 5147d8c..1f18810 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.grpc;
 
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
@@ -31,6 +32,55 @@ import static org.apache.ratis.conf.ConfUtils.*;
 public interface GrpcConfigKeys {
   String PREFIX = "raft.grpc";
 
+  interface TLS {
+    Logger LOG = LoggerFactory.getLogger(TLS.class);
+    static Consumer<String> getDefaultLog() {
+      return LOG::info;
+    }
+
+    String TLS_ROOT_PREFIX = GrpcConfigKeys.PREFIX + ".tls";
+
+    String TLS_ENABLED_KEY = TLS_ROOT_PREFIX + ".enabled";
+    boolean TLS_ENABLED_DEFAULT = false;
+    static boolean tlsEnabled(RaftProperties properties) {
+      return getBoolean(properties::getBoolean, TLS_ENABLED_KEY, TLS_ENABLED_DEFAULT, getDefaultLog());
+    }
+
+    String MUTUAL_AUTHN_ENABLED_KEY = TLS_ROOT_PREFIX + ".mutual_authn.enabled";
+    boolean MUTUAL_AUTHN_ENABLED_DEFAULT = false;
+    static boolean mutualAuthnEnabled(RaftProperties properties) {
+      return getBoolean(properties::getBoolean,
+          MUTUAL_AUTHN_ENABLED_KEY, MUTUAL_AUTHN_ENABLED_DEFAULT, getDefaultLog());
+    }
+
+    String PRIVATE_KEY_FILE_KEY = TLS_ROOT_PREFIX + ".private.key.file.name";
+    String PRIVATE_KEY_FILE_DEFAULT = "private.pem";
+    static String getPrivateKeyFile(RaftProperties properties) {
+      return get(properties::get, PRIVATE_KEY_FILE_KEY, PRIVATE_KEY_FILE_DEFAULT, getDefaultLog());
+    }
+
+    String CERT_CHAIN_FILE_KEY = TLS_ROOT_PREFIX + ".cert.chain.file.name";
+    String CERT_CHAIN_FILE_DEFAULT = "certificate.crt";
+    static String getCertChainFile(RaftProperties properties) {
+      return get(properties::get, CERT_CHAIN_FILE_KEY, CERT_CHAIN_FILE_DEFAULT, getDefaultLog());
+    }
+
+    String TRUST_STORE_KEY = TLS_ROOT_PREFIX + ".trust.store";
+    String TRUST_STORE_DEFAULT = "ca.crt";
+    static String getTrustStore(RaftProperties properties) {
+      return get(properties::get, TRUST_STORE_KEY, TRUST_STORE_DEFAULT, getDefaultLog());
+    }
+
+    String CONF_PARAMETER = TLS_ROOT_PREFIX + ".conf";
+    Class<GrpcTlsConfig> CONF_CLASS = GrpcTlsConfig.class;
+    static GrpcTlsConfig getConf(Parameters parameters) {
+      return parameters != null ? parameters.get(CONF_PARAMETER, CONF_CLASS): null;
+    }
+    static void setConf(Parameters parameters, GrpcTlsConfig conf) {
+      parameters.put(CONF_PARAMETER, conf, GrpcTlsConfig.class);
+    }
+  }
+
   interface Server {
     Logger LOG = LoggerFactory.getLogger(Server.class);
     static Consumer<String> getDefaultLog() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 836ee1c..d4946e7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -29,7 +29,25 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.*;
 
 public class GrpcFactory implements ServerFactory, ClientFactory {
-  public GrpcFactory(Parameters parameters) {}
+  private final GrpcTlsConfig tlsConfig;
+
+  public static Parameters newRaftParameters(GrpcTlsConfig conf) {
+    final Parameters p = new Parameters();
+    GrpcConfigKeys.TLS.setConf(p, conf);
+    return p;
+  }
+
+  public GrpcFactory(Parameters parameters) {
+    this(GrpcConfigKeys.TLS.getConf(parameters));
+  }
+
+  public GrpcFactory(GrpcTlsConfig tlsConfig) {
+    this.tlsConfig = tlsConfig;
+  }
+
+  public GrpcTlsConfig getTlsConfig() {
+    return tlsConfig;
+  }
 
   @Override
   public SupportedRpcType getRpcType() {
@@ -46,11 +64,12 @@ public class GrpcFactory implements ServerFactory, ClientFactory {
   public GrpcService newRaftServerRpc(RaftServer server) {
     return GrpcService.newBuilder()
         .setServer(server)
+        .setTlsConfig(tlsConfig)
         .build();
   }
 
   @Override
   public GrpcClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properties) {
-    return new GrpcClientRpc(clientId, properties);
+    return new GrpcClientRpc(clientId, properties, getTlsConfig());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcTlsConfig.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcTlsConfig.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcTlsConfig.java
new file mode 100644
index 0000000..960bd1d
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcTlsConfig.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import java.io.File;
+
+/**
+ * Ratis GRPC TLS configurations.
+ */
+public class GrpcTlsConfig {
+  // private key
+  private final File privateKey;
+
+  // certificate
+  private final File certChain;
+
+  // ca certificate
+  private final File trustStore;
+
+  // mutual TLS enabled
+  private final boolean mTlsEnabled;
+
+  public File getPrivateKey() {
+    return privateKey;
+  }
+
+  public File getCertChain() {
+    return certChain;
+  }
+
+  public File getTrustStore() {
+    return trustStore;
+  }
+
+  public boolean getMtlsEnabled() {
+    return mTlsEnabled;
+  }
+
+  public GrpcTlsConfig(File privateKey, File certChain, File trustStore, boolean mTlsEnabled)
{
+    this.privateKey = privateKey;
+    this.certChain = certChain;
+    this.trustStore = trustStore;
+    this.mTlsEnabled = mTlsEnabled;
+  }
+
+  @Override
+  public String toString() {
+    return "PrivateKey:" + getPrivateKey().getAbsolutePath() +
+        " Certificate:" + getCertChain().getAbsolutePath() +
+        " TrustStore:" + getTrustStore().getAbsolutePath() +
+        " Mutual TlS:" + getMtlsEnabled();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 8a1b111..8e6503d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -21,6 +21,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto;
 import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
@@ -30,6 +31,8 @@ import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.apache.ratis.proto.grpc.AdminProtocolServiceGrpc;
 import org.apache.ratis.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub;
 import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc;
@@ -81,15 +84,34 @@ public class GrpcClientProtocolClient implements Closeable {
 
   private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
 
-  public GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties)
{
+  public GrpcClientProtocolClient(ClientId id, RaftPeer target,
+                                  RaftProperties properties,
+                                  GrpcTlsConfig tlsConf) {
     this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
     this.target = target;
-
     final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
     final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
-    channel = NettyChannelBuilder.forTarget(target.getAddress())
-        .negotiationType(NegotiationType.PLAINTEXT)
-        .flowControlWindow(flowControlWindow.getSizeInt())
+    NettyChannelBuilder channelBuilder =
+        NettyChannelBuilder.forTarget(target.getAddress());
+
+    if (tlsConf!= null) {
+      SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
+      if (tlsConf.getTrustStore() != null) {
+        sslContextBuilder.trustManager(tlsConf.getTrustStore());
+      }
+      if (tlsConf.getMtlsEnabled()) {
+        sslContextBuilder.keyManager(tlsConf.getCertChain(),
+            tlsConf.getPrivateKey());
+      }
+      try {
+        channelBuilder.useTransportSecurity().sslContext(sslContextBuilder.build());
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    } else {
+      channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
+    }
+    channel = channelBuilder.flowControlWindow(flowControlWindow.getSizeInt())
         .maxInboundMessageSize(maxMessageSize.getSizeInt())
         .build();
     blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
index efe19f4..3a3941b 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.grpc.client;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
@@ -35,8 +36,8 @@ public class GrpcClientProtocolProxy implements Closeable {
 
   public GrpcClientProtocolProxy(ClientId clientId, RaftPeer target,
       Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation,
-      RaftProperties properties) {
-    proxy = new GrpcClientProtocolClient(clientId, target, properties);
+      RaftProperties properties, GrpcTlsConfig tlsConfig) {
+    proxy = new GrpcClientProtocolClient(clientId, target, properties, tlsConfig);
     this.responseHandlerCreation = responseHandlerCreation;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 191aaca..25fcc85 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -21,6 +21,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
@@ -46,11 +47,18 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
 
   private final ClientId clientId;
   private final int maxMessageSize;
+  private final GrpcTlsConfig tlsConfig;
 
-  public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
-    super(new PeerProxyMap<>(clientId.toString(), p -> new GrpcClientProtocolClient(clientId,
p, properties)));
+  public GrpcClientRpc(ClientId clientId, RaftProperties properties, GrpcTlsConfig tlsConfig)
{
+    super(new PeerProxyMap<>(clientId.toString(),
+        p -> new GrpcClientProtocolClient(clientId, p, properties, tlsConfig)));
     this.clientId = clientId;
     this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
+    this.tlsConfig = tlsConfig;
+  }
+
+  public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
+    this(clientId, properties, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
index 71e39b1..1b33392 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc.client;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -85,7 +86,7 @@ public class GrpcClientStreamer implements Closeable {
   private final RaftGroupId groupId;
 
   GrpcClientStreamer(RaftProperties prop, RaftGroup group,
-      RaftPeerId leaderId, ClientId clientId) {
+      RaftPeerId leaderId, ClientId clientId, GrpcTlsConfig tlsConfig) {
     this.clientId = clientId;
     maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop);
     maxMessageSize = GrpcConfigKeys.messageSizeMax(prop, LOG::debug);
@@ -97,8 +98,8 @@ public class GrpcClientStreamer implements Closeable {
     this.peers = group.getPeers().stream().collect(
         Collectors.toMap(RaftPeer::getId, Function.identity()));
     proxyMap = new PeerProxyMap<>(clientId.toString(),
-        raftPeer -> new GrpcClientProtocolProxy(clientId, raftPeer, ResponseHandler::new,
-            prop));
+        raftPeer -> new GrpcClientProtocolProxy(clientId, raftPeer,
+            ResponseHandler::new, prop, tlsConfig));
     proxyMap.addPeers(group.getPeers());
     refreshLeaderProxy(leaderId, null);
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
index e857aaf..e2eff08 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc.client;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -39,12 +40,12 @@ public class GrpcOutputStream extends OutputStream {
   private boolean closed = false;
 
   public GrpcOutputStream(RaftProperties prop, ClientId clientId,
-      RaftGroup group, RaftPeerId leaderId) {
+      RaftGroup group, RaftPeerId leaderId, GrpcTlsConfig tlsConfig) {
     final int bufferSize = GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt();
     buf = new byte[bufferSize];
     count = 0;
     this.clientId = clientId;
-    streamer = new GrpcClientStreamer(prop, group, leaderId, clientId);
+    streamer = new GrpcClientStreamer(prop, group, leaderId, clientId, tlsConfig);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 89675db..a370d72 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -17,7 +17,9 @@
  */
 package org.apache.ratis.grpc.server;
 
+import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
@@ -26,6 +28,7 @@ import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc;
 import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub;
 import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.apache.ratis.util.TimeDuration;
 
 import java.io.Closeable;
@@ -41,11 +44,28 @@ public class GrpcServerProtocolClient implements Closeable {
   private final RaftServerProtocolServiceStub asyncStub;
 
   public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
-      TimeDuration requestTimeoutDuration) {
-    channel = NettyChannelBuilder.forTarget(target.getAddress())
-        .negotiationType(NegotiationType.PLAINTEXT)
-        .flowControlWindow(flowControlWindow)
-        .build();
+      TimeDuration requestTimeoutDuration, GrpcTlsConfig tlsConfig) {
+    NettyChannelBuilder channelBuilder =
+        NettyChannelBuilder.forTarget(target.getAddress());
+
+    if (tlsConfig!= null) {
+      SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
+      if (tlsConfig.getTrustStore() != null) {
+        sslContextBuilder.trustManager(tlsConfig.getTrustStore());
+      }
+      if (tlsConfig.getMtlsEnabled()) {
+        sslContextBuilder.keyManager(tlsConfig.getCertChain(),
+            tlsConfig.getPrivateKey());
+      }
+      try {
+        channelBuilder.useTransportSecurity().sslContext(sslContextBuilder.build());
+      } catch (Exception ex) {
+        throw new IllegalArgumentException("Failed to build SslContext, tlsConfig=" + tlsConfig,
ex);
+      }
+    } else {
+      channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
+    }
+    channel = channelBuilder.flowControlWindow(flowControlWindow).build();
     blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
     asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
     this.requestTimeoutDuration = requestTimeoutDuration;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 4bd370f..5139aff 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.grpc.server;
 
 import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.client.GrpcClientProtocolService;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -25,8 +26,12 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
-import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.util.*;
 import org.slf4j.Logger;
@@ -36,6 +41,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.function.Supplier;
 
+import static org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL;
+
 /** A grpc implementation of {@link RaftServerRpc}. */
 public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient, PeerProxyMap<GrpcServerProtocolClient>>
{
   static final Logger LOG = LoggerFactory.getLogger(GrpcService.class);
@@ -43,6 +50,8 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
       GrpcService.class.getSimpleName() + ".sendRequest";
 
   public static class Builder extends RaftServerRpc.Builder<Builder, GrpcService> {
+    private GrpcTlsConfig tlsConfig;
+
     private Builder() {}
 
     @Override
@@ -52,7 +61,16 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
 
     @Override
     public GrpcService build() {
-      return new GrpcService(getServer());
+      return new GrpcService(getServer(), getTlsConfig());
+    }
+
+    public Builder setTlsConfig(GrpcTlsConfig tlsConfig) {
+      this.tlsConfig = tlsConfig;
+      return this;
+    }
+
+    public GrpcTlsConfig getTlsConfig() {
+      return tlsConfig;
     }
   }
 
@@ -63,32 +81,51 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
   private final Server server;
   private final Supplier<InetSocketAddress> addressSupplier;
 
-  private GrpcService(RaftServer server) {
+  private GrpcService(RaftServer server, GrpcTlsConfig tlsConfig) {
     this(server, server::getId,
         GrpcConfigKeys.Server.port(server.getProperties()),
         GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
         RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
         GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
-        RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
+        RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
+        tlsConfig);
   }
+
   private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, int port,
       SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
-      SizeInBytes flowControlWindow, TimeDuration requestTimeoutDuration) {
+      SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration, GrpcTlsConfig tlsConfig)
{
     super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
-        p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(), requestTimeoutDuration)));
+        p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
+            requestTimeoutDuration, tlsConfig)));
     if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
       throw new IllegalArgumentException("Illegal configuration: "
           + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + " = " + appenderBufferSize
           + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
     }
 
-    server = NettyServerBuilder.forPort(port)
+    NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
         .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
         .flowControlWindow(flowControlWindow.getSizeInt())
         .addService(new GrpcServerProtocolService(idSupplier, raftServer))
         .addService(new GrpcClientProtocolService(idSupplier, raftServer))
-        .addService(new GrpcAdminProtocolService(raftServer))
-        .build();
+        .addService(new GrpcAdminProtocolService(raftServer));
+
+    if (tlsConfig != null) {
+      SslContextBuilder sslContextBuilder =
+          SslContextBuilder.forServer(tlsConfig.getCertChain(),
+              tlsConfig.getPrivateKey());
+      if (tlsConfig.getMtlsEnabled()) {
+        sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+        sslContextBuilder.trustManager(tlsConfig.getCertChain());
+      }
+      sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, OPENSSL);
+      try {
+        nettyServerBuilder.sslContext(sslContextBuilder.build());
+      } catch (Exception ex) {
+        throw new IllegalArgumentException("Failed to build SslContext, tlsConfig=" + tlsConfig,
ex);
+      }
+    }
+    server = nettyServerBuilder.build();
     addressSupplier = JavaUtils.memoize(() -> new InetSocketAddress(port != 0? port: server.getPort()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c038d2e2/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index ba31b2b..8886485 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -91,7 +91,7 @@ public class TestRaftStream extends BaseTest {
     RaftServerImpl leader = waitForLeader(cluster);
 
     try (GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
-        cluster.getGroup(), leader.getId())) {
+        cluster.getGroup(), leader.getId(), null)) {
       for (int i = 0; i < numRequests; i++) { // generate requests
         out.write(toBytes(i));
       }
@@ -129,7 +129,7 @@ public class TestRaftStream extends BaseTest {
 
     RaftServerImpl leader = waitForLeader(cluster);
     GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
-        cluster.getGroup(), leader.getId());
+        cluster.getGroup(), leader.getId(), null);
 
     int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072};
     ByteValue[] values = new ByteValue[lengths.length];
@@ -208,7 +208,7 @@ public class TestRaftStream extends BaseTest {
     RaftServerImpl leader = waitForLeader(cluster);
 
     GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
-        cluster.getGroup(), leader.getId());
+        cluster.getGroup(), leader.getId(), null);
 
     byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2];
     Arrays.fill(b1, (byte) 1);
@@ -274,7 +274,7 @@ public class TestRaftStream extends BaseTest {
       LOG.info("Writer thread starts");
       int count = 0;
       try (GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
-          cluster.getGroup(), leader.getId())) {
+          cluster.getGroup(), leader.getId(), null)) {
         while (running.get()) {
           out.write(toBytes(count++));
           Thread.sleep(10);


Mime
View raw message