hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject [31/50] [abbrv] hbase git commit: HBASE-15628 Implement an AsyncOutputStream which can work with any FileSystem implementation
Date Mon, 25 Apr 2016 21:13:15 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
deleted file mode 100644
index 2225191..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
+++ /dev/null
@@ -1,753 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.util;
-
-import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
-import static io.netty.handler.timeout.IdleState.READER_IDLE;
-import static org.apache.hadoop.fs.CreateFlag.CREATE;
-import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
-import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
-import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoop;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.CodedOutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystemLinkResolver;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DFSOutputStream;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
-import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-
-/**
- * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
- */
-@InterfaceAudience.Private
-public final class FanOutOneBlockAsyncDFSOutputHelper {
-
-  private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputHelper.class);
-
-  private FanOutOneBlockAsyncDFSOutputHelper() {
-  }
-
-  // use pooled allocator for performance.
-  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
-
-  // copied from DFSPacket since it is package private.
-  public static final long HEART_BEAT_SEQNO = -1L;
-
-  // helper class for creating DataChecksum object.
-  private static final Method CREATE_CHECKSUM;
-
-  // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
-  // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
-  // get from proto directly, or combined by the reply field of the proto and a ECN object. See
-  // createPipelineAckStatusGetter for more details.
-  private interface PipelineAckStatusGetter {
-    Status get(PipelineAckProto ack);
-  }
-
-  private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
-
-  // StorageType enum is added in hadoop 2.4, but it is moved to another package in hadoop 2.6 and
-  // the setter method in OpWriteBlockProto is also added in hadoop 2.6. So we need to skip the
-  // setStorageType call if it is hadoop 2.5 or before. See createStorageTypeSetter for more
-  // details.
-  private interface StorageTypeSetter {
-    OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
-  }
-
-  private static final StorageTypeSetter STORAGE_TYPE_SETTER;
-
-  // helper class for calling create method on namenode. There is a supportedVersions parameter for
-  // hadoop 2.6 or after. See createFileCreater for more details.
-  private interface FileCreater {
-    HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
-        String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
-        short replication, long blockSize) throws IOException;
-  }
-
-  private static final FileCreater FILE_CREATER;
-
-  // helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and
-  // hadoop 2.5 or after use inodeId. See createLeaseManager for more details.
-  private interface LeaseManager {
-
-    void begin(DFSClient client, String src, long inodeId);
-
-    void end(DFSClient client, String src, long inodeId);
-  }
-
-  private static final LeaseManager LEASE_MANAGER;
-
-  // This is used to terminate a recoverFileLease call when FileSystem is already closed.
-  // isClientRunning is not public so we need to use reflection.
-  private interface DFSClientAdaptor {
-
-    boolean isClientRunning(DFSClient client);
-  }
-
-  private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
-
-  private static DFSClientAdaptor createDFSClientAdaptor() {
-    try {
-      final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
-      isClientRunningMethod.setAccessible(true);
-      return new DFSClientAdaptor() {
-
-        @Override
-        public boolean isClientRunning(DFSClient client) {
-          try {
-            return (Boolean) isClientRunningMethod.invoke(client);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      };
-    } catch (NoSuchMethodException e) {
-      throw new Error(e);
-    }
-  }
-
-  private static LeaseManager createLeaseManager() {
-    try {
-      final Method beginFileLeaseMethod =
-          DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
-      beginFileLeaseMethod.setAccessible(true);
-      final Method endFileLeaseMethod =
-          DFSClient.class.getDeclaredMethod("endFileLease", long.class);
-      endFileLeaseMethod.setAccessible(true);
-      return new LeaseManager() {
-
-        @Override
-        public void begin(DFSClient client, String src, long inodeId) {
-          try {
-            beginFileLeaseMethod.invoke(client, inodeId, null);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
-        }
-
-        @Override
-        public void end(DFSClient client, String src, long inodeId) {
-          try {
-            endFileLeaseMethod.invoke(client, inodeId);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      };
-    } catch (NoSuchMethodException e) {
-      LOG.warn("No inodeId related lease methods found, should be hadoop 2.4-", e);
-    }
-    try {
-      final Method beginFileLeaseMethod =
-          DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class);
-      beginFileLeaseMethod.setAccessible(true);
-      final Method endFileLeaseMethod =
-          DFSClient.class.getDeclaredMethod("endFileLease", String.class);
-      endFileLeaseMethod.setAccessible(true);
-      return new LeaseManager() {
-
-        @Override
-        public void begin(DFSClient client, String src, long inodeId) {
-          try {
-            beginFileLeaseMethod.invoke(client, src, null);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
-        }
-
-        @Override
-        public void end(DFSClient client, String src, long inodeId) {
-          try {
-            endFileLeaseMethod.invoke(client, src);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      };
-    } catch (NoSuchMethodException e) {
-      throw new Error(e);
-    }
-  }
-
-  private static PipelineAckStatusGetter createPipelineAckStatusGetter() {
-    try {
-      final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
-      @SuppressWarnings("rawtypes")
-      Class<? extends Enum> ecnClass;
-      try {
-        ecnClass =
-            Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
-                .asSubclass(Enum.class);
-      } catch (ClassNotFoundException e) {
-        throw new Error(e);
-      }
-      @SuppressWarnings("unchecked")
-      final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
-      final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
-      final Method combineHeaderMethod =
-          PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
-      final Method getStatusFromHeaderMethod =
-          PipelineAck.class.getMethod("getStatusFromHeader", int.class);
-      return new PipelineAckStatusGetter() {
-
-        @Override
-        public Status get(PipelineAckProto ack) {
-          try {
-            @SuppressWarnings("unchecked")
-            List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
-            Integer headerFlag;
-            if (flagList.isEmpty()) {
-              Status reply = (Status) getReplyMethod.invoke(ack, 0);
-              headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
-            } else {
-              headerFlag = flagList.get(0);
-            }
-            return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      };
-    } catch (NoSuchMethodException e) {
-      LOG.warn("Can not get expected methods, should be hadoop 2.6-", e);
-    }
-    try {
-      final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
-      return new PipelineAckStatusGetter() {
-
-        @Override
-        public Status get(PipelineAckProto ack) {
-          try {
-            return (Status) getStatusMethod.invoke(ack, 0);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      };
-    } catch (NoSuchMethodException e) {
-      throw new Error(e);
-    }
-  }
-
-  private static StorageTypeSetter createStorageTypeSetter() {
-    final Method setStorageTypeMethod;
-    try {
-      setStorageTypeMethod =
-          OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
-    } catch (NoSuchMethodException e) {
-      LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e);
-      return new StorageTypeSetter() {
-
-        @Override
-        public Builder set(Builder builder, Enum<?> storageType) {
-          return builder;
-        }
-      };
-    }
-    ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
-    for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
-      builder.put(storageTypeProto.name(), storageTypeProto);
-    }
-    final ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
-    return new StorageTypeSetter() {
-
-      @Override
-      public Builder set(Builder builder, Enum<?> storageType) {
-        Object protoEnum = name2ProtoEnum.get(storageType.name());
-        try {
-          setStorageTypeMethod.invoke(builder, protoEnum);
-        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-        return builder;
-      }
-    };
-  }
-
-  private static FileCreater createFileCreater() {
-    for (Method method : ClientProtocol.class.getMethods()) {
-      if (method.getName().equals("create")) {
-        final Method createMethod = method;
-        Class<?>[] paramTypes = createMethod.getParameterTypes();
-        if (paramTypes[paramTypes.length - 1] == long.class) {
-          return new FileCreater() {
-
-            @Override
-            public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
-                String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
-                short replication, long blockSize) throws IOException {
-              try {
-                return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
-                  flag, createParent, replication, blockSize);
-              } catch (IllegalAccessException e) {
-                throw new RuntimeException(e);
-              } catch (InvocationTargetException e) {
-                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
-                throw new RuntimeException(e);
-              }
-            }
-          };
-        } else {
-          try {
-            Class<?> cryptoProtocolVersionClass =
-                Class.forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
-            Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
-            final Object supported = supportedMethod.invoke(null);
-            return new FileCreater() {
-
-              @Override
-              public HdfsFileStatus create(ClientProtocol namenode, String src,
-                  FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag,
-                  boolean createParent, short replication, long blockSize) throws IOException {
-                try {
-                  return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
-                    flag, createParent, replication, blockSize, supported);
-                } catch (IllegalAccessException e) {
-                  throw new RuntimeException(e);
-                } catch (InvocationTargetException e) {
-                  Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
-                  throw new RuntimeException(e);
-                }
-              }
-            };
-          } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
-              | InvocationTargetException e) {
-            throw new Error(e);
-          }
-        }
-      }
-    }
-    throw new Error("No create method found for " + ClientProtocol.class.getName());
-  }
-
-  // cancel the processing if DFSClient is already closed.
-  static final class CancelOnClose implements CancelableProgressable {
-
-    private final DFSClient client;
-
-    public CancelOnClose(DFSClient client) {
-      this.client = client;
-    }
-
-    @Override
-    public boolean progress() {
-      return DFS_CLIENT_ADAPTOR.isClientRunning(client);
-    }
-  }
-
-  static {
-    try {
-      CREATE_CHECKSUM = DFSClient.Conf.class.getDeclaredMethod("createChecksum");
-      CREATE_CHECKSUM.setAccessible(true);
-    } catch (NoSuchMethodException e) {
-      throw new Error(e);
-    }
-
-    PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
-    STORAGE_TYPE_SETTER = createStorageTypeSetter();
-    FILE_CREATER = createFileCreater();
-    LEASE_MANAGER = createLeaseManager();
-    DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
-  }
-
-  static void beginFileLease(DFSClient client, String src, long inodeId) {
-    LEASE_MANAGER.begin(client, src, inodeId);
-  }
-
-  static void endFileLease(DFSClient client, String src, long inodeId) {
-    LEASE_MANAGER.end(client, src, inodeId);
-  }
-
-  static DataChecksum createChecksum(DFSClient client) {
-    try {
-      return (DataChecksum) CREATE_CHECKSUM.invoke(client.getConf());
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  static Status getStatus(PipelineAckProto ack) {
-    return PIPELINE_ACK_STATUS_GETTER.get(ack);
-  }
-
-  private static void processWriteBlockResponse(Channel channel, final DatanodeInfo dnInfo,
-      final Promise<Channel> promise, final int timeoutMs) {
-    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
-      new ProtobufVarint32FrameDecoder(),
-      new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
-      new SimpleChannelInboundHandler<BlockOpResponseProto>() {
-
-        @Override
-        protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
-            throws Exception {
-          Status pipelineStatus = resp.getStatus();
-          if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
-            throw new IOException("datanode " + dnInfo + " is restarting");
-          }
-          String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
-          if (resp.getStatus() != Status.SUCCESS) {
-            if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
-              throw new InvalidBlockTokenException("Got access token error" + ", status message "
-                  + resp.getMessage() + ", " + logInfo);
-            } else {
-              throw new IOException("Got error" + ", status=" + resp.getStatus().name()
-                  + ", status message " + resp.getMessage() + ", " + logInfo);
-            }
-          }
-          // success
-          ChannelPipeline p = ctx.pipeline();
-          for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
-            // do not remove all handlers because we may have wrap or unwrap handlers at the header
-            // of pipeline.
-            if (handler instanceof IdleStateHandler) {
-              break;
-            }
-          }
-          // Disable auto read here. Enable it after we setup the streaming pipeline in
-          // FanOutOneBLockAsyncDFSOutput.
-          ctx.channel().config().setAutoRead(false);
-          promise.trySuccess(ctx.channel());
-        }
-
-        @Override
-        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-          promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
-        }
-
-        @Override
-        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-          if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
-            promise
-                .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
-          } else {
-            super.userEventTriggered(ctx, evt);
-          }
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-          promise.tryFailure(cause);
-        }
-      });
-  }
-
-  private static void requestWriteBlock(Channel channel, Enum<?> storageType,
-      OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
-    OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
-    int protoLen = proto.getSerializedSize();
-    ByteBuf buffer =
-        channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
-    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-    buffer.writeByte(Op.WRITE_BLOCK.code);
-    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
-    channel.writeAndFlush(buffer);
-  }
-
-  private static void initialize(Configuration conf, final Channel channel,
-      final DatanodeInfo dnInfo, final Enum<?> storageType,
-      final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs,
-      DFSClient client, Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
-    Promise<Void> saslPromise = channel.eventLoop().newPromise();
-    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
-    saslPromise.addListener(new FutureListener<Void>() {
-
-      @Override
-      public void operationComplete(Future<Void> future) throws Exception {
-        if (future.isSuccess()) {
-          // setup response processing pipeline first, then send request.
-          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
-          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
-        } else {
-          promise.tryFailure(future.cause());
-        }
-      }
-    });
-  }
-
-  private static List<Future<Channel>> connectToDataNodes(final Configuration conf,
-      final DFSClient client, String clientName, final LocatedBlock locatedBlock,
-      long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer,
-      EventLoop eventLoop) {
-    Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
-    DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
-    boolean connectToDnViaHostname =
-        conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
-    final int timeoutMs =
-        conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT);
-    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
-    blockCopy.setNumBytes(locatedBlock.getBlockSize());
-    ClientOperationHeaderProto header =
-        ClientOperationHeaderProto
-            .newBuilder()
-            .setBaseHeader(
-              BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy))
-                  .setToken(PBHelper.convert(locatedBlock.getBlockToken())))
-            .setClientName(clientName).build();
-    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
-    final OpWriteBlockProto.Builder writeBlockProtoBuilder =
-        OpWriteBlockProto.newBuilder().setHeader(header)
-            .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
-            .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
-            .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
-            .setRequestedChecksum(checksumProto)
-            .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
-    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
-    for (int i = 0; i < datanodeInfos.length; i++) {
-      final DatanodeInfo dnInfo = datanodeInfos[i];
-      // Use Enum here because StoregType is moved to another package in hadoop 2.6. Use StorageType
-      // will cause compilation error for hadoop 2.5 or before.
-      final Enum<?> storageType = storageTypes[i];
-      final Promise<Channel> promise = eventLoop.newPromise();
-      futureList.add(promise);
-      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
-      new Bootstrap().group(eventLoop).channel(NioSocketChannel.class)
-          .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
-
-            @Override
-            protected void initChannel(Channel ch) throws Exception {
-              // we need to get the remote address of the channel so we can only move on after
-              // channel connected. Leave an empty implementation here because netty does not allow
-              // a null handler.
-            }
-          }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-              if (future.isSuccess()) {
-                initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
-                  timeoutMs, client, locatedBlock.getBlockToken(), promise);
-              } else {
-                promise.tryFailure(future.cause());
-              }
-            }
-          });
-    }
-    return futureList;
-  }
-
-  /**
-   * Exception other than RemoteException thrown when calling create on namenode
-   */
-  public static class NameNodeException extends IOException {
-
-    private static final long serialVersionUID = 3143237406477095390L;
-
-    public NameNodeException(Throwable cause) {
-      super(cause);
-    }
-  }
-
-  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
-      boolean overwrite, boolean createParent, short replication, long blockSize,
-      EventLoop eventLoop) throws IOException {
-    Configuration conf = dfs.getConf();
-    FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
-    DFSClient client = dfs.getClient();
-    String clientName = client.getClientName();
-    ClientProtocol namenode = client.getNamenode();
-    HdfsFileStatus stat;
-    try {
-      stat =
-          FILE_CREATER.create(
-            namenode,
-            src,
-            FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)),
-            clientName,
-            new EnumSetWritable<CreateFlag>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet
-                .of(CREATE)), createParent, replication, blockSize);
-    } catch (Exception e) {
-      if (e instanceof RemoteException) {
-        throw (RemoteException) e;
-      } else {
-        throw new NameNodeException(e);
-      }
-    }
-    beginFileLease(client, src, stat.getFileId());
-    boolean succ = false;
-    LocatedBlock locatedBlock = null;
-    List<Future<Channel>> futureList = null;
-    try {
-      DataChecksum summer = createChecksum(client);
-      locatedBlock =
-          namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null);
-      List<Channel> datanodeList = new ArrayList<>();
-      futureList =
-          connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE,
-            summer, eventLoop);
-      for (Future<Channel> future : futureList) {
-        // fail the creation if there are connection failures since we are fail-fast. The upper
-        // layer should retry itself if needed.
-        datanodeList.add(future.syncUninterruptibly().getNow());
-      }
-      succ = true;
-      return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName,
-          src, stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC);
-    } finally {
-      if (!succ) {
-        if (futureList != null) {
-          for (Future<Channel> f : futureList) {
-            f.addListener(new FutureListener<Channel>() {
-
-              @Override
-              public void operationComplete(Future<Channel> future) throws Exception {
-                if (future.isSuccess()) {
-                  future.getNow().close();
-                }
-              }
-            });
-          }
-        }
-        endFileLease(client, src, stat.getFileId());
-        fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));
-      }
-    }
-  }
-
-  /**
-   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
-   * inside {@link EventLoop}.
-   * @param eventLoop all connections to datanode will use the same event loop.
-   */
-  public static FanOutOneBlockAsyncDFSOutput createOutput(final DistributedFileSystem dfs, Path f,
-      final boolean overwrite, final boolean createParent, final short replication,
-      final long blockSize, final EventLoop eventLoop) throws IOException {
-    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
-
-      @Override
-      public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException,
-          UnresolvedLinkException {
-        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
-          blockSize, eventLoop);
-      }
-
-      @Override
-      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
-        throw new UnsupportedOperationException();
-      }
-    }.resolve(dfs, f);
-  }
-
-  public static boolean shouldRetryCreate(RemoteException e) {
-    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
-    // For exceptions other than this, we just throw it out. This is same with
-    // DFSOutputStream.newStreamForCreate.
-    return e.getClassName().endsWith("RetryStartFileException");
-  }
-
-  static void completeFile(DFSClient client, ClientProtocol namenode, String src,
-      String clientName, ExtendedBlock block, long fileId) {
-    for (int retry = 0;; retry++) {
-      try {
-        if (namenode.complete(src, clientName, block, fileId)) {
-          endFileLease(client, src, fileId);
-          return;
-        } else {
-          LOG.warn("complete file " + src + " not finished, retry = " + retry);
-        }
-      } catch (LeaseExpiredException e) {
-        LOG.warn("lease for file " + src + " is expired, give up", e);
-        return;
-      } catch (Exception e) {
-        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
-      }
-      sleepIgnoreInterrupt(retry);
-    }
-  }
-
-  static void sleepIgnoreInterrupt(int retry) {
-    try {
-      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
-    } catch (InterruptedException e) {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java
deleted file mode 100644
index 341d4ec..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ /dev/null
@@ -1,1032 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.util;
-
-import static io.netty.handler.timeout.IdleState.READER_IDLE;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.MessageToByteEncoder;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Promise;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.security.SaslPropertiesResolver;
-import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-
-/**
- * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
- */
-@InterfaceAudience.Private
-public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
-
-  private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
-
-  private FanOutOneBlockAsyncDFSOutputSaslHelper() {
-  }
-
-  private static final String SERVER_NAME = "0";
-  private static final String PROTOCOL = "hdfs";
-  private static final String MECHANISM = "DIGEST-MD5";
-  private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
-  private static final String NAME_DELIMITER = " ";
-  private static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
-      "dfs.encrypt.data.transfer.cipher.suites";
-  private static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
-
-  private interface SaslAdaptor {
-
-    SaslPropertiesResolver getSaslPropsResolver(DFSClient client);
-
-    TrustedChannelResolver getTrustedChannelResolver(DFSClient client);
-
-    AtomicBoolean getFallbackToSimpleAuth(DFSClient client);
-
-    DataEncryptionKey createDataEncryptionKey(DFSClient client);
-  }
-
-  private static final SaslAdaptor SASL_ADAPTOR;
-
-  private interface CipherHelper {
-
-    List<Object> getCipherOptions(Configuration conf) throws IOException;
-
-    void addCipherOptions(DataTransferEncryptorMessageProto.Builder builder,
-        List<Object> cipherOptions);
-
-    Object getCipherOption(DataTransferEncryptorMessageProto proto, boolean isNegotiatedQopPrivacy,
-        SaslClient saslClient) throws IOException;
-
-    Object getCipherSuite(Object cipherOption);
-
-    byte[] getInKey(Object cipherOption);
-
-    byte[] getInIv(Object cipherOption);
-
-    byte[] getOutKey(Object cipherOption);
-
-    byte[] getOutIv(Object cipherOption);
-  }
-
-  private static final CipherHelper CIPHER_HELPER;
-
-  private static final class CryptoCodec {
-
-    private static final Method CREATE_CODEC;
-
-    private static final Method CREATE_ENCRYPTOR;
-
-    private static final Method CREATE_DECRYPTOR;
-
-    private static final Method INIT_ENCRYPTOR;
-
-    private static final Method INIT_DECRYPTOR;
-
-    private static final Method ENCRYPT;
-
-    private static final Method DECRYPT;
-
-    static {
-      Class<?> cryptoCodecClass = null;
-      try {
-        cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec");
-      } catch (ClassNotFoundException e) {
-        LOG.warn("No CryptoCodec class found, should be hadoop 2.5-", e);
-      }
-      if (cryptoCodecClass != null) {
-        Method getInstanceMethod = null;
-        for (Method method : cryptoCodecClass.getMethods()) {
-          if (method.getName().equals("getInstance") && method.getParameterTypes().length == 2) {
-            getInstanceMethod = method;
-            break;
-          }
-        }
-        CREATE_CODEC = getInstanceMethod;
-        try {
-          CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor");
-          CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor");
-
-          Class<?> encryptorClass = Class.forName("org.apache.hadoop.crypto.Encryptor");
-          INIT_ENCRYPTOR = encryptorClass.getMethod("init");
-          ENCRYPT = encryptorClass.getMethod("encrypt", ByteBuffer.class, ByteBuffer.class);
-
-          Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor");
-          INIT_DECRYPTOR = decryptorClass.getMethod("init");
-          DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class);
-        } catch (NoSuchMethodException | ClassNotFoundException e) {
-          throw new Error(e);
-        }
-      } else {
-        LOG.warn("Can not initialize CryptoCodec, should be hadoop 2.5-");
-        CREATE_CODEC = null;
-        CREATE_ENCRYPTOR = null;
-        CREATE_DECRYPTOR = null;
-        INIT_ENCRYPTOR = null;
-        INIT_DECRYPTOR = null;
-        ENCRYPT = null;
-        DECRYPT = null;
-      }
-    }
-
-    private final Object encryptor;
-
-    private final Object decryptor;
-
-    public CryptoCodec(Configuration conf, Object cipherOption) {
-      Object codec;
-      try {
-        codec = CREATE_CODEC.invoke(null, conf, CIPHER_HELPER.getCipherSuite(cipherOption));
-        encryptor = CREATE_ENCRYPTOR.invoke(codec);
-        byte[] encKey = CIPHER_HELPER.getInKey(cipherOption);
-        byte[] encIv = CIPHER_HELPER.getInIv(cipherOption);
-        INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length));
-
-        decryptor = CREATE_DECRYPTOR.invoke(codec);
-        byte[] decKey = CIPHER_HELPER.getOutKey(cipherOption);
-        byte[] decIv = CIPHER_HELPER.getOutIv(cipherOption);
-        INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length));
-      } catch (IllegalAccessException | InvocationTargetException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
-      try {
-        ENCRYPT.invoke(encryptor, inBuffer, outBuffer);
-      } catch (IllegalAccessException | InvocationTargetException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
-      try {
-        DECRYPT.invoke(decryptor, inBuffer, outBuffer);
-      } catch (IllegalAccessException | InvocationTargetException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  private static SaslAdaptor createSaslAdaptor27(Class<?> saslDataTransferClientClass)
-      throws NoSuchFieldException, NoSuchMethodException {
-    final Field saslPropsResolverField =
-        saslDataTransferClientClass.getDeclaredField("saslPropsResolver");
-    saslPropsResolverField.setAccessible(true);
-    final Field trustedChannelResolverField =
-        saslDataTransferClientClass.getDeclaredField("trustedChannelResolver");
-    trustedChannelResolverField.setAccessible(true);
-    final Field fallbackToSimpleAuthField =
-        saslDataTransferClientClass.getDeclaredField("fallbackToSimpleAuth");
-    fallbackToSimpleAuthField.setAccessible(true);
-    final Method getSaslDataTransferClientMethod =
-        DFSClient.class.getMethod("getSaslDataTransferClient");
-    final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey");
-    return new SaslAdaptor() {
-
-      @Override
-      public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
-        try {
-          return (TrustedChannelResolver) trustedChannelResolverField
-              .get(getSaslDataTransferClientMethod.invoke(client));
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
-        try {
-          return (SaslPropertiesResolver) saslPropsResolverField
-              .get(getSaslDataTransferClientMethod.invoke(client));
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
-        try {
-          return (AtomicBoolean) fallbackToSimpleAuthField.get(getSaslDataTransferClientMethod
-              .invoke(client));
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
-        try {
-          return (DataEncryptionKey) newDataEncryptionKeyMethod.invoke(client);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private static SaslAdaptor createSaslAdaptor25() {
-    try {
-      final Field trustedChannelResolverField =
-          DFSClient.class.getDeclaredField("trustedChannelResolver");
-      trustedChannelResolverField.setAccessible(true);
-      final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
-      return new SaslAdaptor() {
-
-        @Override
-        public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
-          try {
-            return (TrustedChannelResolver) trustedChannelResolverField.get(client);
-          } catch (IllegalAccessException e) {
-            throw new RuntimeException(e);
-          }
-        }
-
-        @Override
-        public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
-          return null;
-        }
-
-        @Override
-        public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
-          return null;
-        }
-
-        @Override
-        public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
-          try {
-            return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      };
-    } catch (NoSuchFieldException | NoSuchMethodException e) {
-      throw new Error(e);
-    }
-
-  }
-
-  private static SaslAdaptor createSaslAdaptor() {
-    Class<?> saslDataTransferClientClass = null;
-    try {
-      saslDataTransferClientClass =
-          Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient");
-    } catch (ClassNotFoundException e) {
-      LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-");
-    }
-    try {
-      return saslDataTransferClientClass != null ? createSaslAdaptor27(saslDataTransferClientClass)
-          : createSaslAdaptor25();
-    } catch (NoSuchFieldException | NoSuchMethodException e) {
-      throw new Error(e);
-    }
-  }
-
-  private static CipherHelper createCipherHelper25() {
-    return new CipherHelper() {
-
-      @Override
-      public byte[] getOutKey(Object cipherOption) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public byte[] getOutIv(Object cipherOption) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public byte[] getInKey(Object cipherOption) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public byte[] getInIv(Object cipherOption) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public Object getCipherSuite(Object cipherOption) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public List<Object> getCipherOptions(Configuration conf) {
-        return null;
-      }
-
-      @Override
-      public Object getCipherOption(DataTransferEncryptorMessageProto proto,
-          boolean isNegotiatedQopPrivacy, SaslClient saslClient) {
-        return null;
-      }
-
-      @Override
-      public void addCipherOptions(Builder builder, List<Object> cipherOptions) {
-        throw new UnsupportedOperationException();
-      }
-    };
-  }
-
-  private static CipherHelper createCipherHelper27(Class<?> cipherOptionClass)
-      throws ClassNotFoundException, NoSuchMethodException {
-    @SuppressWarnings("rawtypes")
-    Class<? extends Enum> cipherSuiteClass =
-        Class.forName("org.apache.hadoop.crypto.CipherSuite").asSubclass(Enum.class);
-    @SuppressWarnings("unchecked")
-    final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING");
-    final Constructor<?> cipherOptionConstructor =
-        cipherOptionClass.getConstructor(cipherSuiteClass);
-    final Constructor<?> cipherOptionWithKeyAndIvConstructor =
-        cipherOptionClass.getConstructor(cipherSuiteClass, byte[].class, byte[].class,
-          byte[].class, byte[].class);
-
-    final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite");
-    final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey");
-    final Method getInIvMethod = cipherOptionClass.getMethod("getInIv");
-    final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
-    final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
-
-    final Method convertCipherOptionsMethod =
-        PBHelper.class.getMethod("convertCipherOptions", List.class);
-    final Method convertCipherOptionProtosMethod =
-        PBHelper.class.getMethod("convertCipherOptionProtos", List.class);
-    final Method addAllCipherOptionMethod =
-        DataTransferEncryptorMessageProto.Builder.class.getMethod("addAllCipherOption",
-          Iterable.class);
-    final Method getCipherOptionListMethod =
-        DataTransferEncryptorMessageProto.class.getMethod("getCipherOptionList");
-    return new CipherHelper() {
-
-      @Override
-      public byte[] getOutKey(Object cipherOption) {
-        try {
-          return (byte[]) getOutKeyMethod.invoke(cipherOption);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public byte[] getOutIv(Object cipherOption) {
-        try {
-          return (byte[]) getOutIvMethod.invoke(cipherOption);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public byte[] getInKey(Object cipherOption) {
-        try {
-          return (byte[]) getInKeyMethod.invoke(cipherOption);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public byte[] getInIv(Object cipherOption) {
-        try {
-          return (byte[]) getInIvMethod.invoke(cipherOption);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public Object getCipherSuite(Object cipherOption) {
-        try {
-          return getCipherSuiteMethod.invoke(cipherOption);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public List<Object> getCipherOptions(Configuration conf) throws IOException {
-        // Negotiate cipher suites if configured. Currently, the only supported
-        // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
-        // values for future expansion.
-        String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
-        if (cipherSuites == null || cipherSuites.isEmpty()) {
-          return null;
-        }
-        if (!cipherSuites.equals(AES_CTR_NOPADDING)) {
-          throw new IOException(String.format("Invalid cipher suite, %s=%s",
-            DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
-        }
-        Object option;
-        try {
-          option = cipherOptionConstructor.newInstance(aesCipherSuite);
-        } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-        List<Object> cipherOptions = Lists.newArrayListWithCapacity(1);
-        cipherOptions.add(option);
-        return cipherOptions;
-      }
-
-      private Object unwrap(Object option, SaslClient saslClient) throws IOException {
-        byte[] inKey = getInKey(option);
-        if (inKey != null) {
-          inKey = saslClient.unwrap(inKey, 0, inKey.length);
-        }
-        byte[] outKey = getOutKey(option);
-        if (outKey != null) {
-          outKey = saslClient.unwrap(outKey, 0, outKey.length);
-        }
-        try {
-          return cipherOptionWithKeyAndIvConstructor.newInstance(getCipherSuite(option), inKey,
-            getInIv(option), outKey, getOutIv(option));
-        } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public Object getCipherOption(DataTransferEncryptorMessageProto proto,
-          boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
-        List<Object> cipherOptions;
-        try {
-          cipherOptions =
-              (List<Object>) convertCipherOptionProtosMethod.invoke(null,
-                getCipherOptionListMethod.invoke(proto));
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-        if (cipherOptions == null || cipherOptions.isEmpty()) {
-          return null;
-        }
-        Object cipherOption = cipherOptions.get(0);
-        return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
-      }
-
-      @Override
-      public void addCipherOptions(Builder builder, List<Object> cipherOptions) {
-        try {
-          addAllCipherOptionMethod.invoke(builder,
-            convertCipherOptionsMethod.invoke(null, cipherOptions));
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private static CipherHelper createCipherHelper() {
-    Class<?> cipherOptionClass;
-    try {
-      cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption");
-    } catch (ClassNotFoundException e) {
-      LOG.warn("No CipherOption class found, should be hadoop 2.5-");
-      return createCipherHelper25();
-    }
-    try {
-      return createCipherHelper27(cipherOptionClass);
-    } catch (NoSuchMethodException | ClassNotFoundException e) {
-      throw new Error(e);
-    }
-  }
-
-  static {
-    SASL_ADAPTOR = createSaslAdaptor();
-    CIPHER_HELPER = createCipherHelper();
-  }
-
-  /**
-   * Sets user name and password when asked by the client-side SASL object.
-   */
-  private static final class SaslClientCallbackHandler implements CallbackHandler {
-
-    private final char[] password;
-    private final String userName;
-
-    /**
-     * Creates a new SaslClientCallbackHandler.
-     * @param userName SASL user name
-     * @Param password SASL password
-     */
-    public SaslClientCallbackHandler(String userName, char[] password) {
-      this.password = password;
-      this.userName = userName;
-    }
-
-    @Override
-    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
-      NameCallback nc = null;
-      PasswordCallback pc = null;
-      RealmCallback rc = null;
-      for (Callback callback : callbacks) {
-        if (callback instanceof RealmChoiceCallback) {
-          continue;
-        } else if (callback instanceof NameCallback) {
-          nc = (NameCallback) callback;
-        } else if (callback instanceof PasswordCallback) {
-          pc = (PasswordCallback) callback;
-        } else if (callback instanceof RealmCallback) {
-          rc = (RealmCallback) callback;
-        } else {
-          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
-        }
-      }
-      if (nc != null) {
-        nc.setName(userName);
-      }
-      if (pc != null) {
-        pc.setPassword(password);
-      }
-      if (rc != null) {
-        rc.setText(rc.getDefaultText());
-      }
-    }
-  }
-
-  private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
-
-    private final Configuration conf;
-
-    private final Map<String, String> saslProps;
-
-    private final SaslClient saslClient;
-
-    private final int timeoutMs;
-
-    private final Promise<Void> promise;
-
-    private int step = 0;
-
-    public SaslNegotiateHandler(Configuration conf, String username, char[] password,
-        Map<String, String> saslProps, int timeoutMs, Promise<Void> promise) throws SaslException {
-      this.conf = conf;
-      this.saslProps = saslProps;
-      this.saslClient =
-          Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, SERVER_NAME,
-            saslProps, new SaslClientCallbackHandler(username, password));
-      this.timeoutMs = timeoutMs;
-      this.promise = promise;
-    }
-
-    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
-      sendSaslMessage(ctx, payload, null);
-    }
-
-    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List<Object> options)
-        throws IOException {
-      DataTransferEncryptorMessageProto.Builder builder =
-          DataTransferEncryptorMessageProto.newBuilder();
-      builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
-      if (payload != null) {
-        builder.setPayload(ByteString.copyFrom(payload));
-      }
-      if (options != null) {
-        CIPHER_HELPER.addCipherOptions(builder, options);
-      }
-      DataTransferEncryptorMessageProto proto = builder.build();
-      int size = proto.getSerializedSize();
-      size += CodedOutputStream.computeRawVarint32Size(size);
-      ByteBuf buf = ctx.alloc().buffer(size);
-      proto.writeDelimitedTo(new ByteBufOutputStream(buf));
-      ctx.write(buf);
-    }
-
-    @Override
-    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-      ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
-      sendSaslMessage(ctx, new byte[0]);
-      ctx.flush();
-      step++;
-    }
-
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-      saslClient.dispose();
-    }
-
-    private void check(DataTransferEncryptorMessageProto proto) throws IOException {
-      if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
-        throw new InvalidEncryptionKeyException(proto.getMessage());
-      } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
-        throw new IOException(proto.getMessage());
-      }
-    }
-
-    private String getNegotiatedQop() {
-      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
-    }
-
-    private boolean isNegotiatedQopPrivacy() {
-      String qop = getNegotiatedQop();
-      return qop != null && "auth-conf".equalsIgnoreCase(qop);
-    }
-
-    private boolean requestedQopContainsPrivacy() {
-      Set<String> requestedQop =
-          ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
-      return requestedQop.contains("auth-conf");
-    }
-
-    private void checkSaslComplete() throws IOException {
-      if (!saslClient.isComplete()) {
-        throw new IOException("Failed to complete SASL handshake");
-      }
-      Set<String> requestedQop =
-          ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
-      String negotiatedQop = getNegotiatedQop();
-      LOG.debug("Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = "
-          + negotiatedQop);
-      if (!requestedQop.contains(negotiatedQop)) {
-        throw new IOException(String.format("SASL handshake completed, but "
-            + "channel does not have acceptable quality of protection, "
-            + "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
-      }
-    }
-
-    private boolean useWrap() {
-      String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
-      return qop != null && !"auth".equalsIgnoreCase(qop);
-    }
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
-      if (msg instanceof DataTransferEncryptorMessageProto) {
-        DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
-        check(proto);
-        byte[] challenge = proto.getPayload().toByteArray();
-        byte[] response = saslClient.evaluateChallenge(challenge);
-        switch (step) {
-          case 1: {
-            List<Object> cipherOptions = null;
-            if (requestedQopContainsPrivacy()) {
-              cipherOptions = CIPHER_HELPER.getCipherOptions(conf);
-            }
-            sendSaslMessage(ctx, response, cipherOptions);
-            ctx.flush();
-            step++;
-            break;
-          }
-          case 2: {
-            assert response == null;
-            checkSaslComplete();
-            Object cipherOption =
-                CIPHER_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
-            ChannelPipeline p = ctx.pipeline();
-            while (p.first() != null) {
-              p.removeFirst();
-            }
-            if (cipherOption != null) {
-              CryptoCodec codec = new CryptoCodec(conf, cipherOption);
-              p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
-            } else {
-              if (useWrap()) {
-                p.addLast(new SaslWrapHandler(saslClient), new LengthFieldBasedFrameDecoder(
-                    Integer.MAX_VALUE, 0, 4), new SaslUnwrapHandler(saslClient));
-              }
-            }
-            promise.trySuccess(null);
-            break;
-          }
-          default:
-            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
-        }
-      } else {
-        ctx.fireChannelRead(msg);
-      }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-      promise.tryFailure(cause);
-    }
-
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-      if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
-        promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
-      } else {
-        super.userEventTriggered(ctx, evt);
-      }
-    }
-  }
-
-  private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
-
-    private final SaslClient saslClient;
-
-    public SaslUnwrapHandler(SaslClient saslClient) {
-      this.saslClient = saslClient;
-    }
-
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-      saslClient.dispose();
-    }
-
-    @Override
-    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
-      msg.skipBytes(4);
-      byte[] b = new byte[msg.readableBytes()];
-      msg.readBytes(b);
-      ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
-    }
-  }
-
-  private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
-
-    private final SaslClient saslClient;
-
-    private CompositeByteBuf cBuf;
-
-    public SaslWrapHandler(SaslClient saslClient) {
-      this.saslClient = saslClient;
-    }
-
-    @Override
-    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-      cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
-    }
-
-    @Override
-    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
-        throws Exception {
-      if (msg instanceof ByteBuf) {
-        ByteBuf buf = (ByteBuf) msg;
-        cBuf.addComponent(buf);
-        cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
-      } else {
-        ctx.write(msg);
-      }
-    }
-
-    @Override
-    public void flush(ChannelHandlerContext ctx) throws Exception {
-      if (cBuf.isReadable()) {
-        byte[] b = new byte[cBuf.readableBytes()];
-        cBuf.readBytes(b);
-        cBuf.discardReadComponents();
-        byte[] wrapped = saslClient.wrap(b, 0, b.length);
-        ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
-        buf.writeInt(wrapped.length);
-        buf.writeBytes(wrapped);
-        ctx.write(buf);
-      }
-      ctx.flush();
-    }
-
-    @Override
-    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
-      cBuf.release();
-      cBuf = null;
-    }
-  }
-
-  private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
-
-    private final CryptoCodec codec;
-
-    public DecryptHandler(CryptoCodec codec) {
-      this.codec = codec;
-    }
-
-    @Override
-    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
-      ByteBuf inBuf;
-      boolean release = false;
-      if (msg.nioBufferCount() == 1) {
-        inBuf = msg;
-      } else {
-        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
-        msg.readBytes(inBuf);
-        release = true;
-      }
-      ByteBuffer inBuffer = inBuf.nioBuffer();
-      ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
-      ByteBuffer outBuffer = outBuf.nioBuffer();
-      codec.decrypt(inBuffer, outBuffer);
-      outBuf.writerIndex(inBuf.readableBytes());
-      if (release) {
-        inBuf.release();
-      }
-      ctx.fireChannelRead(outBuf);
-    }
-  }
-
-  private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
-
-    private final CryptoCodec codec;
-
-    public EncryptHandler(CryptoCodec codec) {
-      super(false);
-      this.codec = codec;
-    }
-
-    @Override
-    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
-        throws Exception {
-      if (preferDirect) {
-        return ctx.alloc().directBuffer(msg.readableBytes());
-      } else {
-        return ctx.alloc().buffer(msg.readableBytes());
-      }
-    }
-
-    @Override
-    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
-      ByteBuf inBuf;
-      boolean release = false;
-      if (msg.nioBufferCount() == 1) {
-        inBuf = msg;
-      } else {
-        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
-        msg.readBytes(inBuf);
-        release = true;
-      }
-      ByteBuffer inBuffer = inBuf.nioBuffer();
-      ByteBuffer outBuffer = out.nioBuffer();
-      codec.encrypt(inBuffer, outBuffer);
-      out.writerIndex(inBuf.readableBytes());
-      if (release) {
-        inBuf.release();
-      }
-    }
-  }
-
-  private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
-    return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
-        + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
-  }
-
-  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
-    return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
-  }
-
-  private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
-    return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8);
-  }
-
-  private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
-    return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8)
-        .toCharArray();
-  }
-
-  private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
-    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
-    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
-    saslProps.put(Sasl.SERVER_AUTH, "true");
-    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
-    return saslProps;
-  }
-
-  private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
-      String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise) {
-    try {
-      channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
-        new ProtobufVarint32FrameDecoder(),
-        new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
-        new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise));
-    } catch (SaslException e) {
-      saslPromise.tryFailure(e);
-    }
-  }
-
-  static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
-      int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
-      Promise<Void> saslPromise) {
-    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(client);
-    TrustedChannelResolver trustedChannelResolver = SASL_ADAPTOR.getTrustedChannelResolver(client);
-    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(client);
-    InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
-    if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
-      saslPromise.trySuccess(null);
-      return;
-    }
-    DataEncryptionKey encryptionKey;
-    try {
-      encryptionKey = SASL_ADAPTOR.createDataEncryptionKey(client);
-    } catch (Exception e) {
-      saslPromise.tryFailure(e);
-      return;
-    }
-    if (encryptionKey != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = "
-            + dnInfo);
-      }
-      doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
-        encryptionKeyToPassword(encryptionKey.encryptionKey),
-        createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise);
-    } else if (!UserGroupInformation.isSecurityEnabled()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
-            + ", datanodeId = " + dnInfo);
-      }
-      saslPromise.trySuccess(null);
-    } else if (dnInfo.getXferPort() < 1024) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("SASL client skipping handshake in secured configuration with "
-            + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
-      }
-      saslPromise.trySuccess(null);
-    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("SASL client skipping handshake in secured configuration with "
-            + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
-      }
-      saslPromise.trySuccess(null);
-    } else if (saslPropsResolver != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("SASL client doing general handshake for addr = " + addr + ", datanodeId = "
-            + dnInfo);
-      }
-      doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
-        buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise);
-    } else {
-      // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
-      // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
-      // edge case.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
-            + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
-      }
-      saslPromise.trySuccess(null);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java
new file mode 100644
index 0000000..58b5301
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import java.nio.channels.CompletionHandler;
+import java.util.concurrent.ExecutionException;
+
+public final class FanOutOneBlockAsyncDFSOutputFlushHandler
+    implements CompletionHandler<Long, Void> {
+
+  private long size;
+
+  private Throwable error;
+
+  private boolean finished;
+
+  @Override
+  public synchronized void completed(Long result, Void attachment) {
+    size = result.longValue();
+    finished = true;
+    notifyAll();
+  }
+
+  @Override
+  public synchronized void failed(Throwable exc, Void attachment) {
+    error = exc;
+    finished = true;
+    notifyAll();
+  }
+
+  public synchronized long get() throws InterruptedException, ExecutionException {
+    while (!finished) {
+      wait();
+    }
+    if (error != null) {
+      throw new ExecutionException(error);
+    }
+    return size;
+  }
+
+  public void reset() {
+    size = 0L;
+    error = null;
+    finished = false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
new file mode 100644
index 0000000..2be3b28
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.Daemon;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ MiscTests.class, MediumTests.class })
+public class TestFanOutOneBlockAsyncDFSOutput {
+
+  private static final Log LOG = LogFactory.getLog(TestFanOutOneBlockAsyncDFSOutput.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static DistributedFileSystem FS;
+
+  private static EventLoopGroup EVENT_LOOP_GROUP;
+
+  private static int READ_TIMEOUT_MS = 2000;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
+    TEST_UTIL.startMiniDFSCluster(3);
+    FS = TEST_UTIL.getDFSCluster().getFileSystem();
+    EVENT_LOOP_GROUP = new NioEventLoopGroup();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException, InterruptedException {
+    if (EVENT_LOOP_GROUP != null) {
+      EVENT_LOOP_GROUP.shutdownGracefully().sync();
+    }
+    TEST_UTIL.shutdownMiniDFSCluster();
+  }
+
+  private void ensureAllDatanodeAlive() throws InterruptedException {
+    // FanOutOneBlockAsyncDFSOutputHelper.createOutput is fail-fast, so we need to make sure that we
+    // can create a FanOutOneBlockAsyncDFSOutput after a datanode restarting, otherwise some tests
+    // will fail.
+    for (;;) {
+      try {
+        FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
+          new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(),
+          EVENT_LOOP_GROUP.next());
+        out.close();
+        break;
+      } catch (IOException e) {
+        Thread.sleep(100);
+      }
+    }
+  }
+
+  static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f,
+      final FanOutOneBlockAsyncDFSOutput out)
+          throws IOException, InterruptedException, ExecutionException {
+    final byte[] b = new byte[10];
+    ThreadLocalRandom.current().nextBytes(b);
+    final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
+    eventLoop.execute(new Runnable() {
+
+      @Override
+      public void run() {
+        out.write(b, 0, b.length);
+        out.flush(null, handler, false);
+      }
+    });
+    assertEquals(b.length, handler.get());
+    out.close();
+    assertEquals(b.length, dfs.getFileStatus(f).getLen());
+    byte[] actual = new byte[b.length];
+    try (FSDataInputStream in = dfs.open(f)) {
+      in.readFully(actual);
+    }
+    assertArrayEquals(b, actual);
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException, ExecutionException {
+    Path f = new Path("/" + name.getMethodName());
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
+    writeAndVerify(eventLoop, FS, f, out);
+  }
+
+  @Test
+  public void testRecover() throws IOException, InterruptedException, ExecutionException {
+    Path f = new Path("/" + name.getMethodName());
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
+    final byte[] b = new byte[10];
+    ThreadLocalRandom.current().nextBytes(b);
+    final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
+    eventLoop.execute(new Runnable() {
+
+      @Override
+      public void run() {
+        out.write(b, 0, b.length);
+        out.flush(null, handler, false);
+      }
+    });
+    handler.get();
+    // restart one datanode which causes one connection broken
+    TEST_UTIL.getDFSCluster().restartDataNode(0);
+    try {
+      handler.reset();
+      eventLoop.execute(new Runnable() {
+
+        @Override
+        public void run() {
+          out.write(b, 0, b.length);
+          out.flush(null, handler, false);
+        }
+      });
+      try {
+        handler.get();
+        fail("flush should fail");
+      } catch (ExecutionException e) {
+        // we restarted one datanode so the flush should fail
+        LOG.info("expected exception caught", e);
+      }
+      out.recoverAndClose(null);
+      assertEquals(b.length, FS.getFileStatus(f).getLen());
+      byte[] actual = new byte[b.length];
+      try (FSDataInputStream in = FS.open(f)) {
+        in.readFully(actual);
+      }
+      assertArrayEquals(b, actual);
+    } finally {
+      ensureAllDatanodeAlive();
+    }
+  }
+
+  @Test
+  public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
+    Path f = new Path("/" + name.getMethodName());
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
+    Thread.sleep(READ_TIMEOUT_MS * 2);
+    // the connection to datanode should still alive.
+    writeAndVerify(eventLoop, FS, f, out);
+  }
+
+  /**
+   * This is important for fencing when recover from RS crash.
+   */
+  @Test
+  public void testCreateParentFailed() throws IOException {
+    Path f = new Path("/" + name.getMethodName() + "/test");
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    try {
+      FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
+        FS.getDefaultBlockSize(), eventLoop);
+      fail("should fail with parent does not exist");
+    } catch (RemoteException e) {
+      LOG.info("expected exception caught", e);
+      assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException);
+    }
+  }
+
+  @Test
+  public void testConnectToDatanodeFailed()
+      throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
+      InvocationTargetException, InterruptedException, NoSuchFieldException {
+    Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
+    xceiverServerDaemonField.setAccessible(true);
+    Class<?> xceiverServerClass = Class
+        .forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
+    Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
+    numPeersMethod.setAccessible(true);
+    // make one datanode broken
+    TEST_UTIL.getDFSCluster().getDataNodes().get(0).shutdownDatanode(true);
+    try {
+      Path f = new Path("/test");
+      EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+      try {
+        FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
+          FS.getDefaultBlockSize(), eventLoop);
+        fail("should fail with connection error");
+      } catch (IOException e) {
+        LOG.info("expected exception caught", e);
+      }
+      for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) {
+        Daemon daemon = (Daemon) xceiverServerDaemonField.get(dn);
+        assertEquals(0, numPeersMethod.invoke(daemon.getRunnable()));
+      }
+    } finally {
+      TEST_UTIL.getDFSCluster().restartDataNode(0);
+      ensureAllDatanodeAlive();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
new file mode 100644
index 0000000..04cb0ef
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
+import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MiscTests.class, SmallTests.class })
+public class TestLocalAsyncOutput {
+
+  private static EventLoopGroup GROUP = new NioEventLoopGroup();
+
+  private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility();
+
+  @AfterClass
+  public static void tearDownAfterClass() throws IOException {
+    TEST_UTIL.cleanupTestDir();
+    GROUP.shutdownGracefully();
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException, ExecutionException {
+    Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
+    FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
+    AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
+      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next());
+    byte[] b = new byte[10];
+    ThreadLocalRandom.current().nextBytes(b);
+    FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
+    out.write(b);
+    out.flush(null, handler, true);
+    assertEquals(b.length, handler.get());
+    out.close();
+    assertEquals(b.length, fs.getFileStatus(f).getLen());
+    byte[] actual = new byte[b.length];
+    try (FSDataInputStream in = fs.open(f)) {
+      in.readFully(actual);
+    }
+    assertArrayEquals(b, actual);
+  }
+}


Mime
View raw message