Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5F10E200CDB for ; Sat, 22 Jul 2017 00:07:49 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5D1EF16E02E; Fri, 21 Jul 2017 22:07:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2C49D16E021 for ; Sat, 22 Jul 2017 00:07:47 +0200 (CEST) Received: (qmail 87408 invoked by uid 500); 21 Jul 2017 22:07:46 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 87390 invoked by uid 99); 21 Jul 2017 22:07:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Jul 2017 22:07:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 60DC5ED30B; Fri, 21 Jul 2017 22:07:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Fri, 21 Jul 2017 22:07:46 -0000 Message-Id: <6e5db1473cdc4a05ab3ed3e80cadd877@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Fri, 21 Jul 2017 22:07:49 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/ca5b0275/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.DecryptHandler.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.DecryptHandler.html b/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.DecryptHandler.html index 42b6f60..37a733a 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.DecryptHandler.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.DecryptHandler.html @@ -28,10 +28,10 @@ 020import static io.netty.handler.timeout.IdleState.READER_IDLE; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 022 -023import com.google.common.base.Charsets; -024import com.google.common.base.Throwables; -025import com.google.common.collect.ImmutableSet; -026import com.google.common.collect.Maps; +023import org.apache.hadoop.hbase.shaded.com.google.common.base.Charsets; +024import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; +025import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet; +026import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; 027import com.google.protobuf.CodedOutputStream; 028 029import io.netty.buffer.ByteBuf; @@ -93,7 +93,7 @@ 085import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; 086import org.apache.hadoop.fs.FileEncryptionInfo; 087import org.apache.hadoop.hbase.classification.InterfaceAudience; -088import org.apache.hadoop.hbase.util.ByteStringer; +088import com.google.protobuf.ByteString; 089import org.apache.hadoop.hdfs.DFSClient; 090import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 091import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -373,416 +373,418 @@ 365 DataTransferEncryptorMessageProto.newBuilder(); 366 builder.setStatus(DataTransferEncryptorStatus.SUCCESS); 367 if (payload != null) { -368 builder.setPayload(ByteStringer.wrap(payload)); -369 } -370 if (options != null) { -371 builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options)); -372 } -373 DataTransferEncryptorMessageProto proto = builder.build(); -374 int size = proto.getSerializedSize(); -375 size += CodedOutputStream.computeRawVarint32Size(size); -376 ByteBuf buf = ctx.alloc().buffer(size); -377 proto.writeDelimitedTo(new ByteBufOutputStream(buf)); -378 ctx.write(buf); -379 } -380 -381 @Override -382 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { -383 ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); -384 sendSaslMessage(ctx, new byte[0]); -385 ctx.flush(); -386 step++; -387 } -388 -389 @Override -390 public void channelInactive(ChannelHandlerContext ctx) throws Exception { -391 saslClient.dispose(); -392 } -393 -394 private void check(DataTransferEncryptorMessageProto proto) throws IOException { -395 if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { -396 throw new InvalidEncryptionKeyException(proto.getMessage()); -397 } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { -398 throw new IOException(proto.getMessage()); -399 } -400 } -401 -402 private String getNegotiatedQop() { -403 return (String) saslClient.getNegotiatedProperty(Sasl.QOP); -404 } -405 -406 private boolean isNegotiatedQopPrivacy() { -407 String qop = getNegotiatedQop(); -408 return qop != null && "auth-conf".equalsIgnoreCase(qop); -409 } -410 -411 private boolean requestedQopContainsPrivacy() { -412 Set<String> requestedQop = -413 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); -414 return requestedQop.contains("auth-conf"); -415 } -416 -417 private void checkSaslComplete() throws IOException { -418 if (!saslClient.isComplete()) { -419 throw new IOException("Failed to complete SASL handshake"); -420 } -421 Set<String> requestedQop = -422 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); -423 String negotiatedQop = getNegotiatedQop(); -424 LOG.debug( -425 "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); -426 if (!requestedQop.contains(negotiatedQop)) { -427 throw new IOException(String.format("SASL handshake completed, but " -428 + "channel does not have acceptable quality of protection, " -429 + "requested = %s, negotiated = %s", -430 requestedQop, negotiatedQop)); -431 } -432 } -433 -434 private boolean useWrap() { -435 String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); -436 return qop != null && !"auth".equalsIgnoreCase(qop); -437 } -438 -439 private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException { -440 byte[] inKey = option.getInKey(); -441 if (inKey != null) { -442 inKey = saslClient.unwrap(inKey, 0, inKey.length); -443 } -444 byte[] outKey = option.getOutKey(); -445 if (outKey != null) { -446 outKey = saslClient.unwrap(outKey, 0, outKey.length); -447 } -448 return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, -449 option.getOutIv()); -450 } -451 -452 private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, -453 boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { -454 List<CipherOption> cipherOptions = -455 PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList()); -456 if (cipherOptions == null || cipherOptions.isEmpty()) { -457 return null; -458 } -459 CipherOption cipherOption = cipherOptions.get(0); -460 return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; -461 } -462 -463 @Override -464 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { -465 if (msg instanceof DataTransferEncryptorMessageProto) { -466 DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; -467 check(proto); -468 byte[] challenge = proto.getPayload().toByteArray(); -469 byte[] response = saslClient.evaluateChallenge(challenge); -470 switch (step) { -471 case 1: { -472 List<CipherOption> cipherOptions = null; -473 if (requestedQopContainsPrivacy()) { -474 cipherOptions = getCipherOptions(); -475 } -476 sendSaslMessage(ctx, response, cipherOptions); -477 ctx.flush(); -478 step++; -479 break; -480 } -481 case 2: { -482 assert response == null; -483 checkSaslComplete(); -484 CipherOption cipherOption = -485 getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); -486 ChannelPipeline p = ctx.pipeline(); -487 while (p.first() != null) { -488 p.removeFirst(); -489 } -490 if (cipherOption != null) { -491 CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); -492 p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()), -493 new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv())); -494 } else { -495 if (useWrap()) { -496 p.addLast(new SaslWrapHandler(saslClient), -497 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), -498 new SaslUnwrapHandler(saslClient)); -499 } -500 } -501 promise.trySuccess(null); -502 break; -503 } -504 default: -505 throw new IllegalArgumentException("Unrecognized negotiation step: " + step); -506 } -507 } else { -508 ctx.fireChannelRead(msg); -509 } -510 } -511 -512 @Override -513 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { -514 promise.tryFailure(cause); -515 } -516 -517 @Override -518 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { -519 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { -520 promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); -521 } else { -522 super.userEventTriggered(ctx, evt); -523 } -524 } -525 } -526 -527 private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { +368 // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol +369 // and we want to keep that out of hbase-server. +370 builder.setPayload(ByteString.copyFrom(payload)); +371 } +372 if (options != null) { +373 builder.addAllCipherOption(PB_HELPER.convertCipherOptions(options)); +374 } +375 DataTransferEncryptorMessageProto proto = builder.build(); +376 int size = proto.getSerializedSize(); +377 size += CodedOutputStream.computeRawVarint32Size(size); +378 ByteBuf buf = ctx.alloc().buffer(size); +379 proto.writeDelimitedTo(new ByteBufOutputStream(buf)); +380 ctx.write(buf); +381 } +382 +383 @Override +384 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { +385 ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); +386 sendSaslMessage(ctx, new byte[0]); +387 ctx.flush(); +388 step++; +389 } +390 +391 @Override +392 public void channelInactive(ChannelHandlerContext ctx) throws Exception { +393 saslClient.dispose(); +394 } +395 +396 private void check(DataTransferEncryptorMessageProto proto) throws IOException { +397 if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { +398 throw new InvalidEncryptionKeyException(proto.getMessage()); +399 } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { +400 throw new IOException(proto.getMessage()); +401 } +402 } +403 +404 private String getNegotiatedQop() { +405 return (String) saslClient.getNegotiatedProperty(Sasl.QOP); +406 } +407 +408 private boolean isNegotiatedQopPrivacy() { +409 String qop = getNegotiatedQop(); +410 return qop != null && "auth-conf".equalsIgnoreCase(qop); +411 } +412 +413 private boolean requestedQopContainsPrivacy() { +414 Set<String> requestedQop = +415 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); +416 return requestedQop.contains("auth-conf"); +417 } +418 +419 private void checkSaslComplete() throws IOException { +420 if (!saslClient.isComplete()) { +421 throw new IOException("Failed to complete SASL handshake"); +422 } +423 Set<String> requestedQop = +424 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); +425 String negotiatedQop = getNegotiatedQop(); +426 LOG.debug( +427 "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); +428 if (!requestedQop.contains(negotiatedQop)) { +429 throw new IOException(String.format("SASL handshake completed, but " +430 + "channel does not have acceptable quality of protection, " +431 + "requested = %s, negotiated = %s", +432 requestedQop, negotiatedQop)); +433 } +434 } +435 +436 private boolean useWrap() { +437 String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); +438 return qop != null && !"auth".equalsIgnoreCase(qop); +439 } +440 +441 private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException { +442 byte[] inKey = option.getInKey(); +443 if (inKey != null) { +444 inKey = saslClient.unwrap(inKey, 0, inKey.length); +445 } +446 byte[] outKey = option.getOutKey(); +447 if (outKey != null) { +448 outKey = saslClient.unwrap(outKey, 0, outKey.length); +449 } +450 return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, +451 option.getOutIv()); +452 } +453 +454 private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, +455 boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { +456 List<CipherOption> cipherOptions = +457 PB_HELPER.convertCipherOptionProtos(proto.getCipherOptionList()); +458 if (cipherOptions == null || cipherOptions.isEmpty()) { +459 return null; +460 } +461 CipherOption cipherOption = cipherOptions.get(0); +462 return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; +463 } +464 +465 @Override +466 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { +467 if (msg instanceof DataTransferEncryptorMessageProto) { +468 DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; +469 check(proto); +470 byte[] challenge = proto.getPayload().toByteArray(); +471 byte[] response = saslClient.evaluateChallenge(challenge); +472 switch (step) { +473 case 1: { +474 List<CipherOption> cipherOptions = null; +475 if (requestedQopContainsPrivacy()) { +476 cipherOptions = getCipherOptions(); +477 } +478 sendSaslMessage(ctx, response, cipherOptions); +479 ctx.flush(); +480 step++; +481 break; +482 } +483 case 2: { +484 assert response == null; +485 checkSaslComplete(); +486 CipherOption cipherOption = +487 getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); +488 ChannelPipeline p = ctx.pipeline(); +489 while (p.first() != null) { +490 p.removeFirst(); +491 } +492 if (cipherOption != null) { +493 CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); +494 p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()), +495 new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv())); +496 } else { +497 if (useWrap()) { +498 p.addLast(new SaslWrapHandler(saslClient), +499 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), +500 new SaslUnwrapHandler(saslClient)); +501 } +502 } +503 promise.trySuccess(null); +504 break; +505 } +506 default: +507 throw new IllegalArgumentException("Unrecognized negotiation step: " + step); +508 } +509 } else { +510 ctx.fireChannelRead(msg); +511 } +512 } +513 +514 @Override +515 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +516 promise.tryFailure(cause); +517 } +518 +519 @Override +520 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { +521 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { +522 promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); +523 } else { +524 super.userEventTriggered(ctx, evt); +525 } +526 } +527 } 528 -529 private final SaslClient saslClient; +529 private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { 530 -531 public SaslUnwrapHandler(SaslClient saslClient) { -532 this.saslClient = saslClient; -533 } -534 -535 @Override -536 public void channelInactive(ChannelHandlerContext ctx) throws Exception { -537 saslClient.dispose(); -538 } -539 -540 @Override -541 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { -542 msg.skipBytes(4); -543 byte[] b = new byte[msg.readableBytes()]; -544 msg.readBytes(b); -545 ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); -546 } -547 } -548 -549 private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { +531 private final SaslClient saslClient; +532 +533 public SaslUnwrapHandler(SaslClient saslClient) { +534 this.saslClient = saslClient; +535 } +536 +537 @Override +538 public void channelInactive(ChannelHandlerContext ctx) throws Exception { +539 saslClient.dispose(); +540 } +541 +542 @Override +543 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { +544 msg.skipBytes(4); +545 byte[] b = new byte[msg.readableBytes()]; +546 msg.readBytes(b); +547 ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); +548 } +549 } 550 -551 private final SaslClient saslClient; +551 private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { 552 -553 private CompositeByteBuf cBuf; +553 private final SaslClient saslClient; 554 -555 public SaslWrapHandler(SaslClient saslClient) { -556 this.saslClient = saslClient; -557 } -558 -559 @Override -560 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { -561 cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); -562 } -563 -564 @Override -565 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) -566 throws Exception { -567 if (msg instanceof ByteBuf) { -568 ByteBuf buf = (ByteBuf) msg; -569 cBuf.addComponent(buf); -570 cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); -571 } else { -572 ctx.write(msg); -573 } -574 } -575 -576 @Override -577 public void flush(ChannelHandlerContext ctx) throws Exception { -578 if (cBuf.isReadable()) { -579 byte[] b = new byte[cBuf.readableBytes()]; -580 cBuf.readBytes(b); -581 cBuf.discardReadComponents(); -582 byte[] wrapped = saslClient.wrap(b, 0, b.length); -583 ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); -584 buf.writeInt(wrapped.length); -585 buf.writeBytes(wrapped); -586 ctx.write(buf); -587 } -588 ctx.flush(); -589 } -590 -591 @Override -592 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { -593 cBuf.release(); -594 cBuf = null; -595 } -596 } -597 -598 private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { +555 private CompositeByteBuf cBuf; +556 +557 public SaslWrapHandler(SaslClient saslClient) { +558 this.saslClient = saslClient; +559 } +560 +561 @Override +562 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { +563 cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); +564 } +565 +566 @Override +567 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) +568 throws Exception { +569 if (msg instanceof ByteBuf) { +570 ByteBuf buf = (ByteBuf) msg; +571 cBuf.addComponent(buf); +572 cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); +573 } else { +574 ctx.write(msg); +575 } +576 } +577 +578 @Override +579 public void flush(ChannelHandlerContext ctx) throws Exception { +580 if (cBuf.isReadable()) { +581 byte[] b = new byte[cBuf.readableBytes()]; +582 cBuf.readBytes(b); +583 cBuf.discardReadComponents(); +584 byte[] wrapped = saslClient.wrap(b, 0, b.length); +585 ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); +586 buf.writeInt(wrapped.length); +587 buf.writeBytes(wrapped); +588 ctx.write(buf); +589 } +590 ctx.flush(); +591 } +592 +593 @Override +594 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { +595 cBuf.release(); +596 cBuf = null; +597 } +598 } 599 -600 private final Decryptor decryptor; +600 private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { 601 -602 public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv) -603 throws GeneralSecurityException, IOException { -604 this.decryptor = codec.createDecryptor(); -605 this.decryptor.init(key, Arrays.copyOf(iv, iv.length)); -606 } -607 -608 @Override -609 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { -610 ByteBuf inBuf; -611 boolean release = false; -612 if (msg.nioBufferCount() == 1) { -613 inBuf = msg; -614 } else { -615 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); -616 msg.readBytes(inBuf); -617 release = true; -618 } -619 ByteBuffer inBuffer = inBuf.nioBuffer(); -620 ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); -621 ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes()); -622 decryptor.decrypt(inBuffer, outBuffer); -623 outBuf.writerIndex(inBuf.readableBytes()); -624 if (release) { -625 inBuf.release(); -626 } -627 ctx.fireChannelRead(outBuf); -628 } -629 } -630 -631 private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { +602 private final Decryptor decryptor; +603 +604 public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv) +605 throws GeneralSecurityException, IOException { +606 this.decryptor = codec.createDecryptor(); +607 this.decryptor.init(key, Arrays.copyOf(iv, iv.length)); +608 } +609 +610 @Override +611 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { +612 ByteBuf inBuf; +613 boolean release = false; +614 if (msg.nioBufferCount() == 1) { +615 inBuf = msg; +616 } else { +617 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); +618 msg.readBytes(inBuf); +619 release = true; +620 } +621 ByteBuffer inBuffer = inBuf.nioBuffer(); +622 ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); +623 ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes()); +624 decryptor.decrypt(inBuffer, outBuffer); +625 outBuf.writerIndex(inBuf.readableBytes()); +626 if (release) { +627 inBuf.release(); +628 } +629 ctx.fireChannelRead(outBuf); +630 } +631 } 632 -633 private final Encryptor encryptor; +633 private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { 634 -635 public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv) -636 throws GeneralSecurityException, IOException { -637 this.encryptor = codec.createEncryptor(); -638 this.encryptor.init(key, Arrays.copyOf(iv, iv.length)); -639 } -640 -641 @Override -642 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) -643 throws Exception { -644 if (preferDirect) { -645 return ctx.alloc().directBuffer(msg.readableBytes()); -646 } else { -647 return ctx.alloc().buffer(msg.readableBytes()); -648 } -649 } -650 -651 @Override -652 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { -653 ByteBuf inBuf; -654 boolean release = false; -655 if (msg.nioBufferCount() == 1) { -656 inBuf = msg; -657 } else { -658 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); -659 msg.readBytes(inBuf); -660 release = true; -661 } -662 ByteBuffer inBuffer = inBuf.nioBuffer(); -663 ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes()); -664 encryptor.encrypt(inBuffer, outBuffer); -665 out.writerIndex(inBuf.readableBytes()); -666 if (release) { -667 inBuf.release(); -668 } -669 } -670 } -671 -672 private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { -673 return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER -674 + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); -675 } -676 -677 private static char[] encryptionKeyToPassword(byte[] encryptionKey) { -678 return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray(); -679 } -680 -681 private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { -682 return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8); -683 } -684 -685 private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { -686 return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8) -687 .toCharArray(); -688 } -689 -690 private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { -691 Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); -692 saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); -693 saslProps.put(Sasl.SERVER_AUTH, "true"); -694 saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); -695 return saslProps; -696 } -697 -698 private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, -699 String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise) { -700 try { -701 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), -702 new ProtobufVarint32FrameDecoder(), -703 new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), -704 new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise)); -705 } catch (SaslException e) { -706 saslPromise.tryFailure(e); -707 } -708 } -709 -710 static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, -711 int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, -712 Promise<Void> saslPromise) throws IOException { -713 SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); -714 SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); -715 TrustedChannelResolver trustedChannelResolver = -716 SASL_ADAPTOR.getTrustedChannelResolver(saslClient); -717 AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); -718 InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); -719 if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { -720 saslPromise.trySuccess(null); -721 return; -722 } -723 DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); -724 if (encryptionKey != null) { -725 if (LOG.isDebugEnabled()) { -726 LOG.debug( -727 "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); -728 } -729 doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), -730 encryptionKeyToPassword(encryptionKey.encryptionKey), -731 createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise); -732 } else if (!UserGroupInformation.isSecurityEnabled()) { -733 if (LOG.isDebugEnabled()) { -734 LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr -735 + ", datanodeId = " + dnInfo); -736 } -737 saslPromise.trySuccess(null); -738 } else if (dnInfo.getXferPort() < 1024) { -739 if (LOG.isDebugEnabled()) { -740 LOG.debug("SASL client skipping handshake in secured configuration with " -741 + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); -742 } -743 saslPromise.trySuccess(null); -744 } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { -745 if (LOG.isDebugEnabled()) { -746 LOG.debug("SASL client skipping handshake in secured configuration with " -747 + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); -748 } -749 saslPromise.trySuccess(null); -750 } else if (saslPropsResolver != null) { -751 if (LOG.isDebugEnabled()) { -752 LOG.debug( -753 "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); -754 } -755 doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), -756 buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); -757 } else { -758 // It's a secured cluster using non-privileged ports, but no SASL. The only way this can -759 // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare -760 // edge case. -761 if (LOG.isDebugEnabled()) { -762 LOG.debug("SASL client skipping handshake in secured configuration with no SASL " -763 + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); -764 } -765 saslPromise.trySuccess(null); -766 } -767 } -768 -769 static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client) -770 throws IOException { -771 FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); -772 if (feInfo == null) { -773 return null; -774 } -775 return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client); -776 } -777} +635 private final Encryptor encryptor; +636 +637 public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv) +638 throws GeneralSecurityException, IOException { +639 this.encryptor = codec.createEncryptor(); +640 this.encryptor.init(key, Arrays.copyOf(iv, iv.length)); +641 } +642 +643 @Override +644 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) +645 throws Exception { +646 if (preferDirect) { +647 return ctx.alloc().directBuffer(msg.readableBytes()); +648 } else { +649 return ctx.alloc().buffer(msg.readableBytes()); +650 } +651 } +652 +653 @Override +654 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { +655 ByteBuf inBuf; +656 boolean release = false; +657 if (msg.nioBufferCount() == 1) { +658 inBuf = msg; +659 } else { +660 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); +661 msg.readBytes(inBuf); +662 release = true; +663 } +664 ByteBuffer inBuffer = inBuf.nioBuffer(); +665 ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes()); +666 encryptor.encrypt(inBuffer, outBuffer); +667 out.writerIndex(inBuf.readableBytes()); +668 if (release) { +669 inBuf.release(); +670 } +671 } +672 } +673 +674 private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { +675 return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER +676 + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); +677 } +678 +679 private static char[] encryptionKeyToPassword(byte[] encryptionKey) { +680 return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray(); +681 } +682 +683 private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { +684 return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8); +685 } +686 +687 private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { +688 return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8) +689 .toCharArray(); +690 } +691 +692 private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { +693 Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); +694 saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); +695 saslProps.put(Sasl.SERVER_AUTH, "true"); +696 saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); +697 return saslProps; +698 } +699 +700 private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, +701 String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise) { +702 try { +703 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), +704 new ProtobufVarint32FrameDecoder(), +705 new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), +706 new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise)); +707 } catch (SaslException e) { +708 saslPromise.tryFailure(e); +709 } +710 } +711 +712 static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, +713 int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, +714 Promise<Void> saslPromise) throws IOException { +715 SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); +716 SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); +717 TrustedChannelResolver trustedChannelResolver = +718 SASL_ADAPTOR.getTrustedChannelResolver(saslClient); +719 AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); +720 InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); +721 if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { +722 saslPromise.trySuccess(null); +723 return; +724 } +725 DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); +726 if (encryptionKey != null) { +727 if (LOG.isDebugEnabled()) { +728 LOG.debug( +729 "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); +730 } +731 doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), +732 encryptionKeyToPassword(encryptionKey.encryptionKey), +733 createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise); +734 } else if (!UserGroupInformation.isSecurityEnabled()) { +735 if (LOG.isDebugEnabled()) { +736 LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr +737 + ", datanodeId = " + dnInfo); +738 } +739 saslPromise.trySuccess(null); +740 } else if (dnInfo.getXferPort() < 1024) { +741 if (LOG.isDebugEnabled()) { +742 LOG.debug("SASL client skipping handshake in secured configuration with " +743 + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); +744 } +745 saslPromise.trySuccess(null); +746 } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { +747 if (LOG.isDebugEnabled()) { +748 LOG.debug("SASL client skipping handshake in secured configuration with " +749 + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); +750 } +751 saslPromise.trySuccess(null); +752 } else if (saslPropsResolver != null) { +753 if (LOG.isDebugEnabled()) { +754 LOG.debug( +755 "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); +756 } +757 doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), +758 buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); +759 } else { +760 // It's a secured cluster using non-privileged ports, but no SASL. The only way this can +761 // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare +762 // edge case. +763 if (LOG.isDebugEnabled()) { +764 LOG.debug("SASL client skipping handshake in secured configuration with no SASL " +765 + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); +766 } +767 saslPromise.trySuccess(null); +768 } +769 } +770 +771 static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client) +772 throws IOException { +773 FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); +774 if (feInfo == null) { +775 return null; +776 } +777 return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client); +778 } +779}