hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject hbase git commit: HBASE-16110 AsyncFS WAL doesn't work with Hadoop 2.8+
Date Mon, 18 Jul 2016 12:01:31 GMT
Repository: hbase
Updated Branches:
  refs/heads/master a1cc2c4bf -> 515c499f9


HBASE-16110 AsyncFS WAL doesn't work with Hadoop 2.8+

Signed-off-by: Sean Busbey <busbey@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/515c499f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/515c499f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/515c499f

Branch: refs/heads/master
Commit: 515c499f951b864035c5772906b2c0750d9a608f
Parents: a1cc2c4
Author: zhangduo <zhangduo@apache.org>
Authored: Tue Jul 12 11:15:08 2016 +0800
Committer: Sean Busbey <busbey@apache.org>
Committed: Mon Jul 18 06:54:20 2016 -0500

----------------------------------------------------------------------
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   |   4 +-
 .../FanOutOneBlockAsyncDFSOutputHelper.java     | 565 ++++++++++++-------
 .../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 213 +++----
 3 files changed, 470 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/515c499f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 8dd7f5e..9aab924 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.asyncfs;
 import static io.netty.handler.timeout.IdleState.READER_IDLE;
 import static io.netty.handler.timeout.IdleState.WRITER_IDLE;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
@@ -71,7 +72,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.util.DataChecksum;
 
 /**
@@ -339,7 +339,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     this.alloc = alloc;
     this.buf = alloc.directBuffer();
     this.state = State.STREAMING;
-    setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT));
+    setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/515c499f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 2e88ff2..51c48ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -99,15 +99,15 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -128,8 +128,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   // copied from DFSPacket since it is package private.
   public static final long HEART_BEAT_SEQNO = -1L;
 
-  // helper class for creating DataChecksum object.
-  private static final Method CREATE_CHECKSUM;
+  // Timeouts for communicating with DataNode for streaming writes/reads
+  public static final int READ_TIMEOUT = 60 * 1000;
+  public static final int READ_TIMEOUT_EXTENSION = 5 * 1000;
+  public static final int WRITE_TIMEOUT = 8 * 60 * 1000;
 
   // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
   // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
@@ -161,6 +163,17 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static final FileCreater FILE_CREATER;
 
+  // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
+  // hadoop 2.8 or later. See createBlockAdder for more details.
+  private interface BlockAdder {
+
+    LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+        ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
+        throws IOException;
+  }
+
+  private static final BlockAdder BLOCK_ADDER;
+
   // helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and
   // hadoop 2.5 or after use inodeId. See createLeaseManager for more details.
   private interface LeaseManager {
@@ -181,156 +194,182 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
 
-  private static DFSClientAdaptor createDFSClientAdaptor() {
-    try {
-      final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
-      isClientRunningMethod.setAccessible(true);
-      return new DFSClientAdaptor() {
+  // helper class for convert protos.
+  private interface PBHelper {
 
-        @Override
-        public boolean isClientRunning(DFSClient client) {
-          try {
-            return (Boolean) isClientRunningMethod.invoke(client);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
+    ExtendedBlockProto convert(final ExtendedBlock b);
+
+    TokenProto convert(Token<?> tok);
+  }
+
+  private static final PBHelper PB_HELPER;
+
+  // helper class for creating data checksum.
+  private interface ChecksumCreater {
+    DataChecksum createChecksum(Object conf);
+  }
+
+  private static final ChecksumCreater CHECKSUM_CREATER;
+
+  private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
+    final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
+    isClientRunningMethod.setAccessible(true);
+    return new DFSClientAdaptor() {
+
+      @Override
+      public boolean isClientRunning(DFSClient client) {
+        try {
+          return (Boolean) isClientRunningMethod.invoke(client);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
         }
-      };
-    } catch (NoSuchMethodException e) {
-      throw new Error(e);
-    }
+      }
+    };
   }
 
-  private static LeaseManager createLeaseManager() {
-    try {
-      final Method beginFileLeaseMethod =
-          DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
-      beginFileLeaseMethod.setAccessible(true);
-      final Method endFileLeaseMethod =
-          DFSClient.class.getDeclaredMethod("endFileLease", long.class);
-      endFileLeaseMethod.setAccessible(true);
-      return new LeaseManager() {
+  private static LeaseManager createLeaseManager25() throws NoSuchMethodException {
+    final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
+      long.class, DFSOutputStream.class);
+    beginFileLeaseMethod.setAccessible(true);
+    final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
+    endFileLeaseMethod.setAccessible(true);
+    return new LeaseManager() {
 
-        @Override
-        public void begin(DFSClient client, String src, long inodeId) {
-          try {
-            beginFileLeaseMethod.invoke(client, inodeId, null);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
+      @Override
+      public void begin(DFSClient client, String src, long inodeId) {
+        try {
+          beginFileLeaseMethod.invoke(client, inodeId, null);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
         }
+      }
 
-        @Override
-        public void end(DFSClient client, String src, long inodeId) {
-          try {
-            endFileLeaseMethod.invoke(client, inodeId);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
+      @Override
+      public void end(DFSClient client, String src, long inodeId) {
+        try {
+          endFileLeaseMethod.invoke(client, inodeId);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
         }
-      };
-    } catch (NoSuchMethodException e) {
-      LOG.warn("No inodeId related lease methods found, should be hadoop 2.4-", e);
-    }
-    try {
-      final Method beginFileLeaseMethod =
-          DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class);
-      beginFileLeaseMethod.setAccessible(true);
-      final Method endFileLeaseMethod =
-          DFSClient.class.getDeclaredMethod("endFileLease", String.class);
-      endFileLeaseMethod.setAccessible(true);
-      return new LeaseManager() {
+      }
+    };
+  }
 
-        @Override
-        public void begin(DFSClient client, String src, long inodeId) {
-          try {
-            beginFileLeaseMethod.invoke(client, src, null);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
+  private static LeaseManager createLeaseManager24() throws NoSuchMethodException {
+    final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease",
+      String.class, DFSOutputStream.class);
+    beginFileLeaseMethod.setAccessible(true);
+    final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease",
+      String.class);
+    endFileLeaseMethod.setAccessible(true);
+    return new LeaseManager() {
+
+      @Override
+      public void begin(DFSClient client, String src, long inodeId) {
+        try {
+          beginFileLeaseMethod.invoke(client, src, null);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
         }
+      }
 
-        @Override
-        public void end(DFSClient client, String src, long inodeId) {
-          try {
-            endFileLeaseMethod.invoke(client, src);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
+      @Override
+      public void end(DFSClient client, String src, long inodeId) {
+        try {
+          endFileLeaseMethod.invoke(client, src);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
         }
-      };
+      }
+    };
+  }
+
+  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
+    try {
+      return createLeaseManager25();
     } catch (NoSuchMethodException e) {
-      throw new Error(e);
+      LOG.debug("No inodeId related lease methods found, should be hadoop 2.4-", e);
     }
+    return createLeaseManager24();
   }
 
-  private static PipelineAckStatusGetter createPipelineAckStatusGetter() {
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
+      throws NoSuchMethodException {
+    final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
+    @SuppressWarnings("rawtypes")
+    Class<? extends Enum> ecnClass;
     try {
-      final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
-      @SuppressWarnings("rawtypes")
-      Class<? extends Enum> ecnClass;
-      try {
-        ecnClass =
-            Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
-                .asSubclass(Enum.class);
-      } catch (ClassNotFoundException e) {
-        throw new Error(e);
-      }
-      @SuppressWarnings("unchecked")
-      final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
-      final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
-      final Method combineHeaderMethod =
-          PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
-      final Method getStatusFromHeaderMethod =
-          PipelineAck.class.getMethod("getStatusFromHeader", int.class);
-      return new PipelineAckStatusGetter() {
+      ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
+          .asSubclass(Enum.class);
+    } catch (ClassNotFoundException e) {
+      final String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please "
+          + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+          + "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
+    @SuppressWarnings("unchecked")
+    final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
+    final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
+    final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass,
+      Status.class);
+    final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader",
+      int.class);
+    return new PipelineAckStatusGetter() {
 
-        @Override
-        public Status get(PipelineAckProto ack) {
-          try {
-            @SuppressWarnings("unchecked")
-            List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
-            Integer headerFlag;
-            if (flagList.isEmpty()) {
-              Status reply = (Status) getReplyMethod.invoke(ack, 0);
-              headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
-            } else {
-              headerFlag = flagList.get(0);
-            }
-            return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
+      @Override
+      public Status get(PipelineAckProto ack) {
+        try {
+          @SuppressWarnings("unchecked")
+          List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
+          Integer headerFlag;
+          if (flagList.isEmpty()) {
+            Status reply = (Status) getReplyMethod.invoke(ack, 0);
+            headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
+          } else {
+            headerFlag = flagList.get(0);
           }
+          return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
         }
-      };
-    } catch (NoSuchMethodException e) {
-      LOG.warn("Can not get expected methods, should be hadoop 2.6-", e);
-    }
-    try {
-      final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
-      return new PipelineAckStatusGetter() {
+      }
+    };
+  }
 
-        @Override
-        public Status get(PipelineAckProto ack) {
-          try {
-            return (Status) getStatusMethod.invoke(ack, 0);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
+      throws NoSuchMethodException {
+    final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
+    return new PipelineAckStatusGetter() {
+
+      @Override
+      public Status get(PipelineAckProto ack) {
+        try {
+          return (Status) getStatusMethod.invoke(ack, 0);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
         }
-      };
+      }
+    };
+  }
+
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter()
+      throws NoSuchMethodException {
+    try {
+      return createPipelineAckStatusGetter27();
     } catch (NoSuchMethodException e) {
-      throw new Error(e);
+      LOG.debug("Can not get expected methods, should be hadoop 2.6-", e);
     }
+    return createPipelineAckStatusGetter26();
   }
 
   private static StorageTypeSetter createStorageTypeSetter() {
     final Method setStorageTypeMethod;
     try {
-      setStorageTypeMethod =
-          OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
+      setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType",
+        StorageTypeProto.class);
     } catch (NoSuchMethodException e) {
-      LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e);
+      LOG.debug("noSetStorageType method found, should be hadoop 2.5-", e);
       return new StorageTypeSetter() {
 
         @Override
@@ -359,7 +398,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     };
   }
 
-  private static FileCreater createFileCreater() {
+  private static FileCreater createFileCreater() throws ClassNotFoundException,
+      NoSuchMethodException, IllegalAccessException, InvocationTargetException {
     for (Method method : ClientProtocol.class.getMethods()) {
       if (method.getName().equals("create")) {
         final Method createMethod = method;
@@ -372,8 +412,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
                 String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
                 short replication, long blockSize) throws IOException {
               try {
-                return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
-                  flag, createParent, replication, blockSize);
+                return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
+                  createParent, replication, blockSize);
               } catch (IllegalAccessException e) {
                 throw new RuntimeException(e);
               } catch (InvocationTargetException e) {
@@ -383,36 +423,159 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
             }
           };
         } else {
-          try {
-            Class<?> cryptoProtocolVersionClass =
-                Class.forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
-            Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
-            final Object supported = supportedMethod.invoke(null);
-            return new FileCreater() {
+          Class<?> cryptoProtocolVersionClass = Class
+              .forName("org.apache.hadoop.crypto.CryptoProtocolVersion");
+          Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported");
+          final Object supported = supportedMethod.invoke(null);
+          return new FileCreater() {
 
-              @Override
-              public HdfsFileStatus create(ClientProtocol namenode, String src,
-                  FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag,
-                  boolean createParent, short replication, long blockSize) throws IOException {
-                try {
-                  return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName,
-                    flag, createParent, replication, blockSize, supported);
-                } catch (IllegalAccessException e) {
-                  throw new RuntimeException(e);
-                } catch (InvocationTargetException e) {
-                  Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
-                  throw new RuntimeException(e);
-                }
+            @Override
+            public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked,
+                String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
+                short replication, long blockSize) throws IOException {
+              try {
+                return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag,
+                  createParent, replication, blockSize, supported);
+              } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+              } catch (InvocationTargetException e) {
+                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+                throw new RuntimeException(e);
+              }
+            }
+          };
+        }
+      }
+    }
+    throw new NoSuchMethodException("Can not find create method in ClientProtocol");
+  }
+
+  private static BlockAdder createBlockAdder() throws NoSuchMethodException {
+    for (Method method : ClientProtocol.class.getMethods()) {
+      if (method.getName().equals("addBlock")) {
+        final Method addBlockMethod = method;
+        Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
+        if (paramTypes[paramTypes.length - 1] == String[].class) {
+          return new BlockAdder() {
+
+            @Override
+            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+                ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+                String[] favoredNodes) throws IOException {
+              try {
+                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+                  excludeNodes, fileId, favoredNodes);
+              } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+              } catch (InvocationTargetException e) {
+                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+                throw new RuntimeException(e);
+              }
+            }
+          };
+        } else {
+          return new BlockAdder() {
+
+            @Override
+            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+                ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+                String[] favoredNodes) throws IOException {
+              try {
+                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+                  excludeNodes, fileId, favoredNodes, null);
+              } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+              } catch (InvocationTargetException e) {
+                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+                throw new RuntimeException(e);
               }
-            };
-          } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
-              | InvocationTargetException e) {
-            throw new Error(e);
+            }
+          };
+        }
+      }
+    }
+    throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
+  }
+
+  private static PBHelper createPBHelper() throws NoSuchMethodException {
+    Class<?> helperClass;
+    try {
+      helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
+    } catch (ClassNotFoundException e) {
+      LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
+      helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+    }
+    final Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
+    final Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
+    return new PBHelper() {
+
+      @Override
+      public ExtendedBlockProto convert(ExtendedBlock b) {
+        try {
+          return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public TokenProto convert(Token<?> tok) {
+        try {
+          return (TokenProto) convertTokenMethod.invoke(null, tok);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static ChecksumCreater createChecksumCreater28(Class<?> confClass)
+      throws NoSuchMethodException {
+    for (Method method : confClass.getMethods()) {
+      if (method.getName().equals("createChecksum")) {
+        final Method createChecksumMethod = method;
+        return new ChecksumCreater() {
+
+          @Override
+          public DataChecksum createChecksum(Object conf) {
+            try {
+              return (DataChecksum) createChecksumMethod.invoke(conf, (Object) null);
+            } catch (IllegalAccessException | InvocationTargetException e) {
+              throw new RuntimeException(e);
+            }
           }
+        };
+      }
+    }
+    throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
+  }
+
+  private static ChecksumCreater createChecksumCreater27(Class<?> confClass)
+      throws NoSuchMethodException {
+    final Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
+    createChecksumMethod.setAccessible(true);
+    return new ChecksumCreater() {
+
+      @Override
+      public DataChecksum createChecksum(Object conf) {
+        try {
+          return (DataChecksum) createChecksumMethod.invoke(conf);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
         }
       }
+    };
+  }
+
+  private static ChecksumCreater createChecksumCreater()
+      throws NoSuchMethodException, ClassNotFoundException {
+    try {
+      return createChecksumCreater28(
+        Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
+    } catch (ClassNotFoundException e) {
+      LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
     }
-    throw new Error("No create method found for " + ClientProtocol.class.getName());
+    return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
   }
 
   // cancel the processing if DFSClient is already closed.
@@ -432,17 +595,21 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   static {
     try {
-      CREATE_CHECKSUM = DFSClient.Conf.class.getDeclaredMethod("createChecksum");
-      CREATE_CHECKSUM.setAccessible(true);
-    } catch (NoSuchMethodException e) {
-      throw new Error(e);
+      PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
+      STORAGE_TYPE_SETTER = createStorageTypeSetter();
+      FILE_CREATER = createFileCreater();
+      BLOCK_ADDER = createBlockAdder();
+      LEASE_MANAGER = createLeaseManager();
+      DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
+      PB_HELPER = createPBHelper();
+      CHECKSUM_CREATER = createChecksumCreater();
+    } catch (Exception e) {
+      final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+          + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+          + "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
     }
-
-    PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
-    STORAGE_TYPE_SETTER = createStorageTypeSetter();
-    FILE_CREATER = createFileCreater();
-    LEASE_MANAGER = createLeaseManager();
-    DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
   }
 
   static void beginFileLease(DFSClient client, String src, long inodeId) {
@@ -454,11 +621,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   }
 
   static DataChecksum createChecksum(DFSClient client) {
-    try {
-      return (DataChecksum) CREATE_CHECKSUM.invoke(client.getConf());
-    } catch (IllegalAccessException | InvocationTargetException e) {
-      throw new RuntimeException(e);
-    }
+    return CHECKSUM_CREATER.createChecksum(client.getConf());
   }
 
   static Status getStatus(PipelineAckProto ack) {
@@ -530,8 +693,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
     OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
     int protoLen = proto.getSerializedSize();
-    ByteBuf buffer =
-        channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+    ByteBuf buffer = channel.alloc()
+        .buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
     buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
     buffer.writeByte(Op.WRITE_BLOCK.code);
     proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
@@ -540,8 +703,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static void initialize(Configuration conf, final Channel channel,
       final DatanodeInfo dnInfo, final Enum<?> storageType,
-      final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs,
-      DFSClient client, Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
+      final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, DFSClient client,
+      Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) {
     Promise<Void> saslPromise = channel.eventLoop().newPromise();
     trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
     saslPromise.addListener(new FutureListener<Void>() {
@@ -560,32 +723,26 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   }
 
   private static List<Future<Channel>> connectToDataNodes(final Configuration conf,
-      final DFSClient client, String clientName, final LocatedBlock locatedBlock,
-      long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer,
-      EventLoop eventLoop) {
+      final DFSClient client, String clientName, final LocatedBlock locatedBlock, long maxBytesRcvd,
+      long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
     Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
     DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
-    boolean connectToDnViaHostname =
-        conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
-    final int timeoutMs =
-        conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT);
+    boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
+      DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+    final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
     ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
     blockCopy.setNumBytes(locatedBlock.getBlockSize());
-    ClientOperationHeaderProto header =
-        ClientOperationHeaderProto
-            .newBuilder()
-            .setBaseHeader(
-              BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy))
-                  .setToken(PBHelper.convert(locatedBlock.getBlockToken())))
-            .setClientName(clientName).build();
+    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
+        .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
+            .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
+        .setClientName(clientName).build();
     ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
-    final OpWriteBlockProto.Builder writeBlockProtoBuilder =
-        OpWriteBlockProto.newBuilder().setHeader(header)
-            .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
-            .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
-            .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
-            .setRequestedChecksum(checksumProto)
-            .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
+    final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
+        .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
+        .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
+        .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
+        .setRequestedChecksum(checksumProto)
+        .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
     List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
     for (int i = 0; i < datanodeInfos.length; i++) {
       final DatanodeInfo dnInfo = datanodeInfos[i];
@@ -642,14 +799,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     ClientProtocol namenode = client.getNamenode();
     HdfsFileStatus stat;
     try {
-      stat =
-          FILE_CREATER.create(
-            namenode,
-            src,
-            FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)),
-            clientName,
-            new EnumSetWritable<CreateFlag>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet
-                .of(CREATE)), createParent, replication, blockSize);
+      stat = FILE_CREATER.create(namenode, src,
+        FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
+        new EnumSetWritable<CreateFlag>(
+            overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
+        createParent, replication, blockSize);
     } catch (Exception e) {
       if (e instanceof RemoteException) {
         throw (RemoteException) e;
@@ -663,12 +817,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     List<Future<Channel>> futureList = null;
     try {
       DataChecksum summer = createChecksum(client);
-      locatedBlock =
-          namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null);
+      locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, null,
+        stat.getFileId(), null);
       List<Channel> datanodeList = new ArrayList<>();
-      futureList =
-          connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE,
-            summer, eventLoop);
+      futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
+        PIPELINE_SETUP_CREATE, summer, eventLoop);
       for (Future<Channel> future : futureList) {
         // fail the creation if there are connection failures since we are fail-fast. The upper
         // layer should retry itself if needed.
@@ -712,8 +865,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
 
       @Override
-      public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException,
-          UnresolvedLinkException {
+      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
+          throws IOException, UnresolvedLinkException {
         return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
           blockSize, eventLoop);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/515c499f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
index 33e8841..0546253 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -86,7 +86,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.security.SaslPropertiesResolver;
@@ -112,8 +111,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
   private static final String NAME_DELIMITER = " ";
 
   @VisibleForTesting
-  static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
-    "dfs.encrypt.data.transfer.cipher.suites";
+  static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites";
 
   @VisibleForTesting
   static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
@@ -185,7 +183,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
       try {
         cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec");
       } catch (ClassNotFoundException e) {
-        LOG.warn("No CryptoCodec class found, should be hadoop 2.5-", e);
+        LOG.debug("No CryptoCodec class found, should be hadoop 2.5-", e);
       }
       if (cryptoCodecClass != null) {
         Method getInstanceMethod = null;
@@ -195,8 +193,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
             break;
           }
         }
-        CREATE_CODEC = getInstanceMethod;
         try {
+          if (getInstanceMethod == null) {
+            throw new NoSuchMethodException(
+                "Can not find suitable getInstance method in CryptoCodec");
+          }
+          CREATE_CODEC = getInstanceMethod;
           CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor");
           CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor");
 
@@ -207,11 +209,14 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
           Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor");
           INIT_DECRYPTOR = decryptorClass.getMethod("init", byte[].class, byte[].class);
           DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class);
-        } catch (NoSuchMethodException | ClassNotFoundException e) {
-          throw new Error(e);
+        } catch (Exception e) {
+          final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+              + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+              + "HBASE-16110 for more information.";
+          LOG.error(msg, e);
+          throw new Error(msg, e);
         }
       } else {
-        LOG.warn("Can not initialize CryptoCodec, should be hadoop 2.5-");
         CREATE_CODEC = null;
         CREATE_ENCRYPTOR = null;
         CREATE_DECRYPTOR = null;
@@ -329,62 +334,53 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     };
   }
 
-  private static SaslAdaptor createSaslAdaptor25() {
-    try {
-      final Field trustedChannelResolverField = DFSClient.class
-          .getDeclaredField("trustedChannelResolver");
-      trustedChannelResolverField.setAccessible(true);
-      final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
-      return new SaslAdaptor() {
-
-        @Override
-        public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
-          try {
-            return (TrustedChannelResolver) trustedChannelResolverField.get(client);
-          } catch (IllegalAccessException e) {
-            throw new RuntimeException(e);
-          }
-        }
+  private static SaslAdaptor createSaslAdaptor25()
+      throws NoSuchFieldException, NoSuchMethodException {
+    final Field trustedChannelResolverField = DFSClient.class
+        .getDeclaredField("trustedChannelResolver");
+    trustedChannelResolverField.setAccessible(true);
+    final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
+    return new SaslAdaptor() {
 
-        @Override
-        public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
-          return null;
+      @Override
+      public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
+        try {
+          return (TrustedChannelResolver) trustedChannelResolverField.get(client);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
         }
+      }
 
-        @Override
-        public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
-          return null;
-        }
+      @Override
+      public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
+        return null;
+      }
 
-        @Override
-        public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
-          try {
-            return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
-          } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      };
-    } catch (NoSuchFieldException | NoSuchMethodException e) {
-      throw new Error(e);
-    }
+      @Override
+      public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
+        return null;
+      }
 
+      @Override
+      public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
+        try {
+          return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
   }
 
-  private static SaslAdaptor createSaslAdaptor() {
-    Class<?> saslDataTransferClientClass = null;
+  private static SaslAdaptor createSaslAdaptor()
+      throws NoSuchFieldException, NoSuchMethodException {
     try {
-      saslDataTransferClientClass = Class
-          .forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient");
+      return createSaslAdaptor27(
+        Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"));
     } catch (ClassNotFoundException e) {
-      LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-");
-    }
-    try {
-      return saslDataTransferClientClass != null ? createSaslAdaptor27(saslDataTransferClientClass)
-          : createSaslAdaptor25();
-    } catch (NoSuchFieldException | NoSuchMethodException e) {
-      throw new Error(e);
+      LOG.debug("No SaslDataTransferClient class found, should be hadoop 2.5-", e);
     }
+    return createSaslAdaptor25();
   }
 
   private static CipherOptionHelper createCipherHelper25() {
@@ -451,9 +447,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
     final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
 
-    final Method convertCipherOptionsMethod = PBHelper.class.getMethod("convertCipherOptions",
+    Class<?> pbHelperClass;
+    try {
+      pbHelperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient");
+    } catch (ClassNotFoundException e) {
+      LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e);
+      pbHelperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+    }
+    final Method convertCipherOptionsMethod = pbHelperClass.getMethod("convertCipherOptions",
       List.class);
-    final Method convertCipherOptionProtosMethod = PBHelper.class
+    final Method convertCipherOptionProtosMethod = pbHelperClass
         .getMethod("convertCipherOptionProtos", List.class);
     final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class
         .getMethod("addAllCipherOption", Iterable.class);
@@ -577,19 +580,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     };
   }
 
-  private static CipherOptionHelper createCipherHelper() {
+  private static CipherOptionHelper createCipherHelper()
+      throws ClassNotFoundException, NoSuchMethodException {
     Class<?> cipherOptionClass;
     try {
       cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption");
     } catch (ClassNotFoundException e) {
-      LOG.warn("No CipherOption class found, should be hadoop 2.5-");
+      LOG.debug("No CipherOption class found, should be hadoop 2.5-", e);
       return createCipherHelper25();
     }
-    try {
-      return createCipherHelper27(cipherOptionClass);
-    } catch (NoSuchMethodException | ClassNotFoundException e) {
-      throw new Error(e);
-    }
+    return createCipherHelper27(cipherOptionClass);
   }
 
   private static TransparentCryptoHelper createTransparentCryptoHelper25() {
@@ -646,25 +646,30 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     };
   }
 
-  private static TransparentCryptoHelper createTransparentCryptoHelper() {
+  private static TransparentCryptoHelper createTransparentCryptoHelper()
+      throws NoSuchMethodException, ClassNotFoundException {
     Class<?> feInfoClass;
     try {
       feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo");
     } catch (ClassNotFoundException e) {
-      LOG.warn("No FileEncryptionInfo class found, should be hadoop 2.5-");
+      LOG.debug("No FileEncryptionInfo class found, should be hadoop 2.5-", e);
       return createTransparentCryptoHelper25();
     }
-    try {
-      return createTransparentCryptoHelper27(feInfoClass);
-    } catch (NoSuchMethodException | ClassNotFoundException e) {
-      throw new Error(e);
-    }
+    return createTransparentCryptoHelper27(feInfoClass);
   }
 
   static {
-    SASL_ADAPTOR = createSaslAdaptor();
-    CIPHER_OPTION_HELPER = createCipherHelper();
-    TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
+    try {
+      SASL_ADAPTOR = createSaslAdaptor();
+      CIPHER_OPTION_HELPER = createCipherHelper();
+      TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
+    } catch (Exception e) {
+      final String msg = "Couldn't properly initialize access to HDFS internals. Please "
+          + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+          + "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
   }
 
   /**
@@ -828,40 +833,40 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
         byte[] challenge = proto.getPayload().toByteArray();
         byte[] response = saslClient.evaluateChallenge(challenge);
         switch (step) {
-          case 1: {
-            List<Object> cipherOptions = null;
-            if (requestedQopContainsPrivacy()) {
-              cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
-            }
-            sendSaslMessage(ctx, response, cipherOptions);
-            ctx.flush();
-            step++;
-            break;
+        case 1: {
+          List<Object> cipherOptions = null;
+          if (requestedQopContainsPrivacy()) {
+            cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
           }
-          case 2: {
-            assert response == null;
-            checkSaslComplete();
-            Object cipherOption =
-                CIPHER_OPTION_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
-            ChannelPipeline p = ctx.pipeline();
-            while (p.first() != null) {
-              p.removeFirst();
-            }
-            if (cipherOption != null) {
-              CryptoCodec codec = new CryptoCodec(conf, cipherOption);
-              p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
-            } else {
-              if (useWrap()) {
-                p.addLast(new SaslWrapHandler(saslClient),
-                  new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
-                  new SaslUnwrapHandler(saslClient));
-              }
+          sendSaslMessage(ctx, response, cipherOptions);
+          ctx.flush();
+          step++;
+          break;
+        }
+        case 2: {
+          assert response == null;
+          checkSaslComplete();
+          Object cipherOption = CIPHER_OPTION_HELPER.getCipherOption(proto,
+            isNegotiatedQopPrivacy(), saslClient);
+          ChannelPipeline p = ctx.pipeline();
+          while (p.first() != null) {
+            p.removeFirst();
+          }
+          if (cipherOption != null) {
+            CryptoCodec codec = new CryptoCodec(conf, cipherOption);
+            p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
+          } else {
+            if (useWrap()) {
+              p.addLast(new SaslWrapHandler(saslClient),
+                new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+                new SaslUnwrapHandler(saslClient));
             }
-            promise.trySuccess(null);
-            break;
           }
-          default:
-            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+          promise.trySuccess(null);
+          break;
+        }
+        default:
+          throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
         }
       } else {
         ctx.fireChannelRead(msg);


Mime
View raw message