hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject [32/50] [abbrv] hbase git commit: HBASE-15628 Implement an AsyncOutputStream which can work with any FileSystem implementation
Date Mon, 25 Apr 2016 21:13:16 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
new file mode 100644
index 0000000..870262e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -0,0 +1,1033 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
+
+  private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
+
+  private FanOutOneBlockAsyncDFSOutputSaslHelper() {
+  }
+
+  private static final String SERVER_NAME = "0";
+  private static final String PROTOCOL = "hdfs";
+  private static final String MECHANISM = "DIGEST-MD5";
+  private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+  private static final String NAME_DELIMITER = " ";
+  private static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
+      "dfs.encrypt.data.transfer.cipher.suites";
+  private static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
+
+  private interface SaslAdaptor {
+
+    SaslPropertiesResolver getSaslPropsResolver(DFSClient client);
+
+    TrustedChannelResolver getTrustedChannelResolver(DFSClient client);
+
+    AtomicBoolean getFallbackToSimpleAuth(DFSClient client);
+
+    DataEncryptionKey createDataEncryptionKey(DFSClient client);
+  }
+
+  private static final SaslAdaptor SASL_ADAPTOR;
+
+  private interface CipherHelper {
+
+    List<Object> getCipherOptions(Configuration conf) throws IOException;
+
+    void addCipherOptions(DataTransferEncryptorMessageProto.Builder builder,
+        List<Object> cipherOptions);
+
+    Object getCipherOption(DataTransferEncryptorMessageProto proto, boolean isNegotiatedQopPrivacy,
+        SaslClient saslClient) throws IOException;
+
+    Object getCipherSuite(Object cipherOption);
+
+    byte[] getInKey(Object cipherOption);
+
+    byte[] getInIv(Object cipherOption);
+
+    byte[] getOutKey(Object cipherOption);
+
+    byte[] getOutIv(Object cipherOption);
+  }
+
+  private static final CipherHelper CIPHER_HELPER;
+
+  private static final class CryptoCodec {
+
+    private static final Method CREATE_CODEC;
+
+    private static final Method CREATE_ENCRYPTOR;
+
+    private static final Method CREATE_DECRYPTOR;
+
+    private static final Method INIT_ENCRYPTOR;
+
+    private static final Method INIT_DECRYPTOR;
+
+    private static final Method ENCRYPT;
+
+    private static final Method DECRYPT;
+
+    static {
+      Class<?> cryptoCodecClass = null;
+      try {
+        cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec");
+      } catch (ClassNotFoundException e) {
+        LOG.warn("No CryptoCodec class found, should be hadoop 2.5-", e);
+      }
+      if (cryptoCodecClass != null) {
+        Method getInstanceMethod = null;
+        for (Method method : cryptoCodecClass.getMethods()) {
+          if (method.getName().equals("getInstance") && method.getParameterTypes().length == 2) {
+            getInstanceMethod = method;
+            break;
+          }
+        }
+        CREATE_CODEC = getInstanceMethod;
+        try {
+          CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor");
+          CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor");
+
+          Class<?> encryptorClass = Class.forName("org.apache.hadoop.crypto.Encryptor");
+          INIT_ENCRYPTOR = encryptorClass.getMethod("init");
+          ENCRYPT = encryptorClass.getMethod("encrypt", ByteBuffer.class, ByteBuffer.class);
+
+          Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor");
+          INIT_DECRYPTOR = decryptorClass.getMethod("init");
+          DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class);
+        } catch (NoSuchMethodException | ClassNotFoundException e) {
+          throw new Error(e);
+        }
+      } else {
+        LOG.warn("Can not initialize CryptoCodec, should be hadoop 2.5-");
+        CREATE_CODEC = null;
+        CREATE_ENCRYPTOR = null;
+        CREATE_DECRYPTOR = null;
+        INIT_ENCRYPTOR = null;
+        INIT_DECRYPTOR = null;
+        ENCRYPT = null;
+        DECRYPT = null;
+      }
+    }
+
+    private final Object encryptor;
+
+    private final Object decryptor;
+
+    public CryptoCodec(Configuration conf, Object cipherOption) {
+      Object codec;
+      try {
+        codec = CREATE_CODEC.invoke(null, conf, CIPHER_HELPER.getCipherSuite(cipherOption));
+        encryptor = CREATE_ENCRYPTOR.invoke(codec);
+        byte[] encKey = CIPHER_HELPER.getInKey(cipherOption);
+        byte[] encIv = CIPHER_HELPER.getInIv(cipherOption);
+        INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length));
+
+        decryptor = CREATE_DECRYPTOR.invoke(codec);
+        byte[] decKey = CIPHER_HELPER.getOutKey(cipherOption);
+        byte[] decIv = CIPHER_HELPER.getOutIv(cipherOption);
+        INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length));
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
+      try {
+        ENCRYPT.invoke(encryptor, inBuffer, outBuffer);
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
+      try {
+        DECRYPT.invoke(decryptor, inBuffer, outBuffer);
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static SaslAdaptor createSaslAdaptor27(Class<?> saslDataTransferClientClass)
+      throws NoSuchFieldException, NoSuchMethodException {
+    final Field saslPropsResolverField =
+        saslDataTransferClientClass.getDeclaredField("saslPropsResolver");
+    saslPropsResolverField.setAccessible(true);
+    final Field trustedChannelResolverField =
+        saslDataTransferClientClass.getDeclaredField("trustedChannelResolver");
+    trustedChannelResolverField.setAccessible(true);
+    final Field fallbackToSimpleAuthField =
+        saslDataTransferClientClass.getDeclaredField("fallbackToSimpleAuth");
+    fallbackToSimpleAuthField.setAccessible(true);
+    final Method getSaslDataTransferClientMethod =
+        DFSClient.class.getMethod("getSaslDataTransferClient");
+    final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey");
+    return new SaslAdaptor() {
+
+      @Override
+      public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
+        try {
+          return (TrustedChannelResolver) trustedChannelResolverField
+              .get(getSaslDataTransferClientMethod.invoke(client));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
+        try {
+          return (SaslPropertiesResolver) saslPropsResolverField
+              .get(getSaslDataTransferClientMethod.invoke(client));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
+        try {
+          return (AtomicBoolean) fallbackToSimpleAuthField.get(getSaslDataTransferClientMethod
+              .invoke(client));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
+        try {
+          return (DataEncryptionKey) newDataEncryptionKeyMethod.invoke(client);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static SaslAdaptor createSaslAdaptor25() {
+    try {
+      final Field trustedChannelResolverField =
+          DFSClient.class.getDeclaredField("trustedChannelResolver");
+      trustedChannelResolverField.setAccessible(true);
+      final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey");
+      return new SaslAdaptor() {
+
+        @Override
+        public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) {
+          try {
+            return (TrustedChannelResolver) trustedChannelResolverField.get(client);
+          } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+        @Override
+        public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) {
+          return null;
+        }
+
+        @Override
+        public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
+          return null;
+        }
+
+        @Override
+        public DataEncryptionKey createDataEncryptionKey(DFSClient client) {
+          try {
+            return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client);
+          } catch (IllegalAccessException | InvocationTargetException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      };
+    } catch (NoSuchFieldException | NoSuchMethodException e) {
+      throw new Error(e);
+    }
+
+  }
+
+  private static SaslAdaptor createSaslAdaptor() {
+    Class<?> saslDataTransferClientClass = null;
+    try {
+      saslDataTransferClientClass =
+          Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient");
+    } catch (ClassNotFoundException e) {
+      LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-");
+    }
+    try {
+      return saslDataTransferClientClass != null ? createSaslAdaptor27(saslDataTransferClientClass)
+          : createSaslAdaptor25();
+    } catch (NoSuchFieldException | NoSuchMethodException e) {
+      throw new Error(e);
+    }
+  }
+
+  private static CipherHelper createCipherHelper25() {
+    return new CipherHelper() {
+
+      @Override
+      public byte[] getOutKey(Object cipherOption) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public byte[] getOutIv(Object cipherOption) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public byte[] getInKey(Object cipherOption) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public byte[] getInIv(Object cipherOption) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Object getCipherSuite(Object cipherOption) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public List<Object> getCipherOptions(Configuration conf) {
+        return null;
+      }
+
+      @Override
+      public Object getCipherOption(DataTransferEncryptorMessageProto proto,
+          boolean isNegotiatedQopPrivacy, SaslClient saslClient) {
+        return null;
+      }
+
+      @Override
+      public void addCipherOptions(Builder builder, List<Object> cipherOptions) {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  private static CipherHelper createCipherHelper27(Class<?> cipherOptionClass)
+      throws ClassNotFoundException, NoSuchMethodException {
+    @SuppressWarnings("rawtypes")
+    Class<? extends Enum> cipherSuiteClass =
+        Class.forName("org.apache.hadoop.crypto.CipherSuite").asSubclass(Enum.class);
+    @SuppressWarnings("unchecked")
+    final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING");
+    final Constructor<?> cipherOptionConstructor =
+        cipherOptionClass.getConstructor(cipherSuiteClass);
+    final Constructor<?> cipherOptionWithKeyAndIvConstructor =
+        cipherOptionClass.getConstructor(cipherSuiteClass, byte[].class, byte[].class,
+          byte[].class, byte[].class);
+
+    final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite");
+    final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey");
+    final Method getInIvMethod = cipherOptionClass.getMethod("getInIv");
+    final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
+    final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
+
+    final Method convertCipherOptionsMethod =
+        PBHelper.class.getMethod("convertCipherOptions", List.class);
+    final Method convertCipherOptionProtosMethod =
+        PBHelper.class.getMethod("convertCipherOptionProtos", List.class);
+    final Method addAllCipherOptionMethod =
+        DataTransferEncryptorMessageProto.Builder.class.getMethod("addAllCipherOption",
+          Iterable.class);
+    final Method getCipherOptionListMethod =
+        DataTransferEncryptorMessageProto.class.getMethod("getCipherOptionList");
+    return new CipherHelper() {
+
+      @Override
+      public byte[] getOutKey(Object cipherOption) {
+        try {
+          return (byte[]) getOutKeyMethod.invoke(cipherOption);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public byte[] getOutIv(Object cipherOption) {
+        try {
+          return (byte[]) getOutIvMethod.invoke(cipherOption);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public byte[] getInKey(Object cipherOption) {
+        try {
+          return (byte[]) getInKeyMethod.invoke(cipherOption);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public byte[] getInIv(Object cipherOption) {
+        try {
+          return (byte[]) getInIvMethod.invoke(cipherOption);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public Object getCipherSuite(Object cipherOption) {
+        try {
+          return getCipherSuiteMethod.invoke(cipherOption);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public List<Object> getCipherOptions(Configuration conf) throws IOException {
+        // Negotiate cipher suites if configured. Currently, the only supported
+        // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+        // values for future expansion.
+        String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+        if (cipherSuites == null || cipherSuites.isEmpty()) {
+          return null;
+        }
+        if (!cipherSuites.equals(AES_CTR_NOPADDING)) {
+          throw new IOException(String.format("Invalid cipher suite, %s=%s",
+            DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+        }
+        Object option;
+        try {
+          option = cipherOptionConstructor.newInstance(aesCipherSuite);
+        } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+        List<Object> cipherOptions = Lists.newArrayListWithCapacity(1);
+        cipherOptions.add(option);
+        return cipherOptions;
+      }
+
+      private Object unwrap(Object option, SaslClient saslClient) throws IOException {
+        byte[] inKey = getInKey(option);
+        if (inKey != null) {
+          inKey = saslClient.unwrap(inKey, 0, inKey.length);
+        }
+        byte[] outKey = getOutKey(option);
+        if (outKey != null) {
+          outKey = saslClient.unwrap(outKey, 0, outKey.length);
+        }
+        try {
+          return cipherOptionWithKeyAndIvConstructor.newInstance(getCipherSuite(option), inKey,
+            getInIv(option), outKey, getOutIv(option));
+        } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public Object getCipherOption(DataTransferEncryptorMessageProto proto,
+          boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
+        List<Object> cipherOptions;
+        try {
+          cipherOptions =
+              (List<Object>) convertCipherOptionProtosMethod.invoke(null,
+                getCipherOptionListMethod.invoke(proto));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+        if (cipherOptions == null || cipherOptions.isEmpty()) {
+          return null;
+        }
+        Object cipherOption = cipherOptions.get(0);
+        return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
+      }
+
+      @Override
+      public void addCipherOptions(Builder builder, List<Object> cipherOptions) {
+        try {
+          addAllCipherOptionMethod.invoke(builder,
+            convertCipherOptionsMethod.invoke(null, cipherOptions));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static CipherHelper createCipherHelper() {
+    Class<?> cipherOptionClass;
+    try {
+      cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption");
+    } catch (ClassNotFoundException e) {
+      LOG.warn("No CipherOption class found, should be hadoop 2.5-");
+      return createCipherHelper25();
+    }
+    try {
+      return createCipherHelper27(cipherOptionClass);
+    } catch (NoSuchMethodException | ClassNotFoundException e) {
+      throw new Error(e);
+    }
+  }
+
+  static {
+    SASL_ADAPTOR = createSaslAdaptor();
+    CIPHER_HELPER = createCipherHelper();
+  }
+
+  /**
+   * Sets user name and password when asked by the client-side SASL object.
+   */
+  private static final class SaslClientCallbackHandler implements CallbackHandler {
+
+    private final char[] password;
+    private final String userName;
+
+    /**
+     * Creates a new SaslClientCallbackHandler.
+     * @param userName SASL user name
+     * @Param password SASL password
+     */
+    public SaslClientCallbackHandler(String userName, char[] password) {
+      this.password = password;
+      this.userName = userName;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        pc.setPassword(password);
+      }
+      if (rc != null) {
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+
+  private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
+
+    private final Configuration conf;
+
+    private final Map<String, String> saslProps;
+
+    private final SaslClient saslClient;
+
+    private final int timeoutMs;
+
+    private final Promise<Void> promise;
+
+    private int step = 0;
+
+    public SaslNegotiateHandler(Configuration conf, String username, char[] password,
+        Map<String, String> saslProps, int timeoutMs, Promise<Void> promise) throws SaslException {
+      this.conf = conf;
+      this.saslProps = saslProps;
+      this.saslClient =
+          Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, SERVER_NAME,
+            saslProps, new SaslClientCallbackHandler(username, password));
+      this.timeoutMs = timeoutMs;
+      this.promise = promise;
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
+      sendSaslMessage(ctx, payload, null);
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List<Object> options)
+        throws IOException {
+      DataTransferEncryptorMessageProto.Builder builder =
+          DataTransferEncryptorMessageProto.newBuilder();
+      builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+      if (payload != null) {
+        builder.setPayload(ByteString.copyFrom(payload));
+      }
+      if (options != null) {
+        CIPHER_HELPER.addCipherOptions(builder, options);
+      }
+      DataTransferEncryptorMessageProto proto = builder.build();
+      int size = proto.getSerializedSize();
+      size += CodedOutputStream.computeRawVarint32Size(size);
+      ByteBuf buf = ctx.alloc().buffer(size);
+      proto.writeDelimitedTo(new ByteBufOutputStream(buf));
+      ctx.write(buf);
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
+      sendSaslMessage(ctx, new byte[0]);
+      ctx.flush();
+      step++;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      saslClient.dispose();
+    }
+
+    private void check(DataTransferEncryptorMessageProto proto) throws IOException {
+      if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+        throw new InvalidEncryptionKeyException(proto.getMessage());
+      } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+        throw new IOException(proto.getMessage());
+      }
+    }
+
+    private String getNegotiatedQop() {
+      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    }
+
+    private boolean isNegotiatedQopPrivacy() {
+      String qop = getNegotiatedQop();
+      return qop != null && "auth-conf".equalsIgnoreCase(qop);
+    }
+
+    private boolean requestedQopContainsPrivacy() {
+      Set<String> requestedQop =
+          ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      return requestedQop.contains("auth-conf");
+    }
+
+    private void checkSaslComplete() throws IOException {
+      if (!saslClient.isComplete()) {
+        throw new IOException("Failed to complete SASL handshake");
+      }
+      Set<String> requestedQop =
+          ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      String negotiatedQop = getNegotiatedQop();
+      LOG.debug("Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = "
+          + negotiatedQop);
+      if (!requestedQop.contains(negotiatedQop)) {
+        throw new IOException(String.format("SASL handshake completed, but "
+            + "channel does not have acceptable quality of protection, "
+            + "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
+      }
+    }
+
+    private boolean useWrap() {
+      String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+      return qop != null && !"auth".equalsIgnoreCase(qop);
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
+      if (msg instanceof DataTransferEncryptorMessageProto) {
+        DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
+        check(proto);
+        byte[] challenge = proto.getPayload().toByteArray();
+        byte[] response = saslClient.evaluateChallenge(challenge);
+        switch (step) {
+          case 1: {
+            List<Object> cipherOptions = null;
+            if (requestedQopContainsPrivacy()) {
+              cipherOptions = CIPHER_HELPER.getCipherOptions(conf);
+            }
+            sendSaslMessage(ctx, response, cipherOptions);
+            ctx.flush();
+            step++;
+            break;
+          }
+          case 2: {
+            assert response == null;
+            checkSaslComplete();
+            Object cipherOption =
+                CIPHER_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
+            ChannelPipeline p = ctx.pipeline();
+            while (p.first() != null) {
+              p.removeFirst();
+            }
+            if (cipherOption != null) {
+              CryptoCodec codec = new CryptoCodec(conf, cipherOption);
+              p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
+            } else {
+              if (useWrap()) {
+                p.addLast(new SaslWrapHandler(saslClient), new LengthFieldBasedFrameDecoder(
+                    Integer.MAX_VALUE, 0, 4), new SaslUnwrapHandler(saslClient));
+              }
+            }
+            promise.trySuccess(null);
+            break;
+          }
+          default:
+            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+        }
+      } else {
+        ctx.fireChannelRead(msg);
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+      promise.tryFailure(cause);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+      if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+        promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+      } else {
+        super.userEventTriggered(ctx, evt);
+      }
+    }
+  }
+
+  private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    private final SaslClient saslClient;
+
+    public SaslUnwrapHandler(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      saslClient.dispose();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+      msg.skipBytes(4);
+      byte[] b = new byte[msg.readableBytes()];
+      msg.readBytes(b);
+      ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
+    }
+  }
+
+  private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
+
+    private final SaslClient saslClient;
+
+    private CompositeByteBuf cBuf;
+
+    public SaslWrapHandler(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+        throws Exception {
+      if (msg instanceof ByteBuf) {
+        ByteBuf buf = (ByteBuf) msg;
+        cBuf.addComponent(buf);
+        cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
+      } else {
+        ctx.write(msg);
+      }
+    }
+
+    @Override
+    public void flush(ChannelHandlerContext ctx) throws Exception {
+      if (cBuf.isReadable()) {
+        byte[] b = new byte[cBuf.readableBytes()];
+        cBuf.readBytes(b);
+        cBuf.discardReadComponents();
+        byte[] wrapped = saslClient.wrap(b, 0, b.length);
+        ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
+        buf.writeInt(wrapped.length);
+        buf.writeBytes(wrapped);
+        ctx.write(buf);
+      }
+      ctx.flush();
+    }
+
+    @Override
+    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+      cBuf.release();
+      cBuf = null;
+    }
+  }
+
+  private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    private final CryptoCodec codec;
+
+    public DecryptHandler(CryptoCodec codec) {
+      this.codec = codec;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+      ByteBuf inBuf;
+      boolean release = false;
+      if (msg.nioBufferCount() == 1) {
+        inBuf = msg;
+      } else {
+        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+        msg.readBytes(inBuf);
+        release = true;
+      }
+      ByteBuffer inBuffer = inBuf.nioBuffer();
+      ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
+      ByteBuffer outBuffer = outBuf.nioBuffer();
+      codec.decrypt(inBuffer, outBuffer);
+      outBuf.writerIndex(inBuf.readableBytes());
+      if (release) {
+        inBuf.release();
+      }
+      ctx.fireChannelRead(outBuf);
+    }
+  }
+
+  private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
+
+    private final CryptoCodec codec;
+
+    public EncryptHandler(CryptoCodec codec) {
+      super(false);
+      this.codec = codec;
+    }
+
+    @Override
+    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
+        throws Exception {
+      if (preferDirect) {
+        return ctx.alloc().directBuffer(msg.readableBytes());
+      } else {
+        return ctx.alloc().buffer(msg.readableBytes());
+      }
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
+      ByteBuf inBuf;
+      boolean release = false;
+      if (msg.nioBufferCount() == 1) {
+        inBuf = msg;
+      } else {
+        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+        msg.readBytes(inBuf);
+        release = true;
+      }
+      ByteBuffer inBuffer = inBuf.nioBuffer();
+      ByteBuffer outBuffer = out.nioBuffer();
+      codec.encrypt(inBuffer, outBuffer);
+      out.writerIndex(inBuf.readableBytes());
+      if (release) {
+        inBuf.release();
+      }
+    }
+  }
+
+  private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
+    return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
+        + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
+  }
+
+  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+    return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
+  }
+
+  private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8);
+  }
+
+  private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8)
+        .toCharArray();
+  }
+
+  private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
+    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
+    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+    return saslProps;
+  }
+
+  private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
+      String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise) {
+    try {
+      channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+        new ProtobufVarint32FrameDecoder(),
+        new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
+        new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise));
+    } catch (SaslException e) {
+      saslPromise.tryFailure(e);
+    }
+  }
+
+  static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+      int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
+      Promise<Void> saslPromise) {
+    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(client);
+    TrustedChannelResolver trustedChannelResolver = SASL_ADAPTOR.getTrustedChannelResolver(client);
+    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(client);
+    InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
+    if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
+      saslPromise.trySuccess(null);
+      return;
+    }
+    DataEncryptionKey encryptionKey;
+    try {
+      encryptionKey = SASL_ADAPTOR.createDataEncryptionKey(client);
+    } catch (Exception e) {
+      saslPromise.tryFailure(e);
+      return;
+    }
+    if (encryptionKey != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = "
+            + dnInfo);
+      }
+      doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
+        encryptionKeyToPassword(encryptionKey.encryptionKey),
+        createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise);
+    } else if (!UserGroupInformation.isSecurityEnabled()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
+            + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (dnInfo.getXferPort() < 1024) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with "
+            + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with "
+            + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (saslPropsResolver != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client doing general handshake for addr = " + addr + ", datanodeId = "
+            + dnInfo);
+      }
+      doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
+        buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise);
+    } else {
+      // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
+      // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
+      // edge case.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
+            + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index b80f2c9..d5bccf0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -18,7 +18,10 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT;
-import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import io.netty.channel.EventLoop;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
@@ -37,8 +40,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,8 +49,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput;
-import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
+import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
+import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
@@ -209,7 +210,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       .newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
           .setNameFormat("Close-WAL-Writer-%d").build());
 
-  private volatile FanOutOneBlockAsyncDFSOutput hdfsOut;
+  private volatile AsyncFSOutput fsOut;
 
   private final Deque<FSWALEntry> waitingAppendEntries = new ArrayDeque<FSWALEntry>();
 
@@ -663,7 +664,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     final AsyncWriter oldWriter = this.writer;
     this.writer = nextWriter;
     if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
-      this.hdfsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
+      this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
     }
     this.fileLengthAtLastSync = 0L;
     boolean scheduleTask;
@@ -721,7 +722,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   @Override
   DatanodeInfo[] getPipeline() {
-    FanOutOneBlockAsyncDFSOutput output = this.hdfsOut;
+    AsyncFSOutput output = this.fsOut;
     return output != null ? output.getPipeline() : new DatanodeInfo[0];
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index 894f3dd..886b172 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import com.google.common.base.Throwables;
+import com.google.common.primitives.Ints;
+
+import io.netty.channel.EventLoop;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
@@ -29,18 +34,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
+import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
+import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
-import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput;
-import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-
-import com.google.common.base.Throwables;
-import com.google.common.primitives.Ints;
-
-import io.netty.channel.EventLoop;
 
 /**
  * AsyncWriter for protobuf-based WAL.
@@ -97,7 +96,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
 
   private final EventLoop eventLoop;
 
-  private FanOutOneBlockAsyncDFSOutput output;
+  private AsyncFSOutput output;
 
   private ByteArrayOutputStream buf;
 
@@ -149,16 +148,15 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
     this.output = null;
   }
 
-  public FanOutOneBlockAsyncDFSOutput getOutput() {
+  public AsyncFSOutput getOutput() {
     return this.output;
   }
 
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
       short replication, long blockSize) throws IOException {
-    this.output =
-        FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, path,
-          overwritable, false, replication, blockSize, eventLoop);
+    this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
+      blockSize, eventLoop);
     this.buf = new ByteArrayOutputStream();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java
deleted file mode 100644
index bdbf865..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java
+++ /dev/null
@@ -1,533 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.util;
-
-import static io.netty.handler.timeout.IdleState.READER_IDLE;
-import static io.netty.handler.timeout.IdleState.WRITER_IDLE;
-import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
-import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
-import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
-import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoop;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.channels.CompletionHandler;
-import java.util.ArrayDeque;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.IdentityHashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Supplier;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.util.DataChecksum;
-
-/**
- * An asynchronous HDFS output stream implementation which fans out data to datanode and only
- * supports writing file with only one block.
- * <p>
- * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly
- * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
- * method. And we place it here under util package because we want to make it independent of WAL
- * implementation thus easier to move it to HDFS project finally.
- * <p>
- * Note that, all connections to datanode will run in the same {@link EventLoop} which means we only
- * need one thread here. But be careful, we do some blocking operations in {@link #close()} and
- * {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside
- * {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)},
- * {@link #buffered()} and {@link #flush(Object, CompletionHandler, boolean)}, if you call them
- * outside {@link EventLoop}, there will be an extra context-switch.
- * <p>
- * Advantages compare to DFSOutputStream:
- * <ol>
- * <li>The fan out mechanism. This will reduce the latency.</li>
- * <li>The asynchronous WAL could also run in the same EventLoop, we could just call write and flush
- * inside the EventLoop thread, so generally we only have one thread to do all the things.</li>
- * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
- * ASAP.</li>
- * <li>We could benefit from netty's ByteBuf management mechanism.</li>
- * </ol>
- */
-@InterfaceAudience.Private
-public class FanOutOneBlockAsyncDFSOutput implements Closeable {
-
-  private final Configuration conf;
-
-  private final FSUtils fsUtils;
-
-  private final DistributedFileSystem dfs;
-
-  private final DFSClient client;
-
-  private final ClientProtocol namenode;
-
-  private final String clientName;
-
-  private final String src;
-
-  private final long fileId;
-
-  private final LocatedBlock locatedBlock;
-
-  private final EventLoop eventLoop;
-
-  private final List<Channel> datanodeList;
-
-  private final DataChecksum summer;
-
-  private final ByteBufAllocator alloc;
-
-  private static final class Callback {
-
-    public final Promise<Void> promise;
-
-    public final long ackedLength;
-
-    public final Set<Channel> unfinishedReplicas;
-
-    public Callback(Promise<Void> promise, long ackedLength, Collection<Channel> replicas) {
-      this.promise = promise;
-      this.ackedLength = ackedLength;
-      if (replicas.isEmpty()) {
-        this.unfinishedReplicas = Collections.emptySet();
-      } else {
-        this.unfinishedReplicas = Collections
-            .newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size()));
-        this.unfinishedReplicas.addAll(replicas);
-      }
-    }
-  }
-
-  private final Deque<Callback> waitingAckQueue = new ArrayDeque<>();
-
-  // this could be different from acked block length because a packet can not start at the middle of
-  // a chunk.
-  private long nextPacketOffsetInBlock = 0L;
-
-  private long nextPacketSeqno = 0L;
-
-  private ByteBuf buf;
-
-  private enum State {
-    STREAMING, CLOSING, BROKEN, CLOSED
-  }
-
-  private State state;
-
-  private void completed(Channel channel) {
-    if (waitingAckQueue.isEmpty()) {
-      return;
-    }
-    for (Callback c : waitingAckQueue) {
-      if (c.unfinishedReplicas.remove(channel)) {
-        if (c.unfinishedReplicas.isEmpty()) {
-          c.promise.trySuccess(null);
-          // since we will remove the Callback entry from waitingAckQueue if its unfinishedReplicas
-          // is empty, so this could only happen at the head of waitingAckQueue, so we just call
-          // removeFirst here.
-          waitingAckQueue.removeFirst();
-          // also wake up flush requests which have the same length.
-          for (Callback cb; (cb = waitingAckQueue.peekFirst()) != null;) {
-            if (cb.ackedLength == c.ackedLength) {
-              cb.promise.trySuccess(null);
-              waitingAckQueue.removeFirst();
-            } else {
-              break;
-            }
-          }
-        }
-        return;
-      }
-    }
-  }
-
-  private void failed(Channel channel, Supplier<Throwable> errorSupplier) {
-    if (state == State.BROKEN || state == State.CLOSED) {
-      return;
-    }
-    if (state == State.CLOSING) {
-      Callback c = waitingAckQueue.peekFirst();
-      if (c == null || !c.unfinishedReplicas.contains(channel)) {
-        // nothing, the endBlock request has already finished.
-        return;
-      }
-    }
-    // disable further write, and fail all pending ack.
-    state = State.BROKEN;
-    Throwable error = errorSupplier.get();
-    for (Callback c : waitingAckQueue) {
-      c.promise.tryFailure(error);
-    }
-    waitingAckQueue.clear();
-    for (Channel ch : datanodeList) {
-      ch.close();
-    }
-  }
-
-  private void setupReceiver(final int timeoutMs) {
-    SimpleChannelInboundHandler<PipelineAckProto> ackHandler = new SimpleChannelInboundHandler<PipelineAckProto>() {
-
-      @Override
-      public boolean isSharable() {
-        return true;
-      }
-
-      @Override
-      protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack)
-          throws Exception {
-        final Status reply = getStatus(ack);
-        if (reply != Status.SUCCESS) {
-          failed(ctx.channel(), new Supplier<Throwable>() {
-
-            @Override
-            public Throwable get() {
-              return new IOException("Bad response " + reply + " for block "
-                  + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
-            }
-          });
-          return;
-        }
-        if (PipelineAck.isRestartOOBStatus(reply)) {
-          failed(ctx.channel(), new Supplier<Throwable>() {
-
-            @Override
-            public Throwable get() {
-              return new IOException("Restart response " + reply + " for block "
-                  + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
-            }
-          });
-          return;
-        }
-        if (ack.getSeqno() == HEART_BEAT_SEQNO) {
-          return;
-        }
-        completed(ctx.channel());
-      }
-
-      @Override
-      public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
-        failed(ctx.channel(), new Supplier<Throwable>() {
-
-          @Override
-          public Throwable get() {
-            return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed");
-          }
-        });
-      }
-
-      @Override
-      public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause)
-          throws Exception {
-        failed(ctx.channel(), new Supplier<Throwable>() {
-
-          @Override
-          public Throwable get() {
-            return cause;
-          }
-        });
-      }
-
-      @Override
-      public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-        if (evt instanceof IdleStateEvent) {
-          IdleStateEvent e = (IdleStateEvent) evt;
-          if (e.state() == READER_IDLE) {
-            failed(ctx.channel(), new Supplier<Throwable>() {
-
-              @Override
-              public Throwable get() {
-                return new IOException("Timeout(" + timeoutMs + "ms) waiting for response");
-              }
-            });
-          } else if (e.state() == WRITER_IDLE) {
-            PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
-            int len = heartbeat.getSerializedSize();
-            ByteBuf buf = alloc.buffer(len);
-            heartbeat.putInBuffer(buf.nioBuffer(0, len));
-            buf.writerIndex(len);
-            ctx.channel().writeAndFlush(buf);
-          }
-          return;
-        }
-        super.userEventTriggered(ctx, evt);
-      }
-
-    };
-    for (Channel ch : datanodeList) {
-      ch.pipeline().addLast(
-        new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
-        new ProtobufVarint32FrameDecoder(),
-        new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler);
-      ch.config().setAutoRead(true);
-    }
-  }
-
-  FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs,
-      DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
-      LocatedBlock locatedBlock, EventLoop eventLoop, List<Channel> datanodeList,
-      DataChecksum summer, ByteBufAllocator alloc) {
-    this.conf = conf;
-    this.fsUtils = fsUtils;
-    this.dfs = dfs;
-    this.client = client;
-    this.namenode = namenode;
-    this.fileId = fileId;
-    this.clientName = clientName;
-    this.src = src;
-    this.locatedBlock = locatedBlock;
-    this.eventLoop = eventLoop;
-    this.datanodeList = datanodeList;
-    this.summer = summer;
-    this.alloc = alloc;
-    this.buf = alloc.directBuffer();
-    this.state = State.STREAMING;
-    setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT));
-  }
-
-  /**
-   * Just call write(b, 0, b.length).
-   * @see #write(byte[], int, int)
-   */
-  public void write(byte[] b) {
-    write(b, 0, b.length);
-  }
-
-  /**
-   * Copy the data into the buffer. Note that you need to call
-   * {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually.
-   */
-  public void write(final byte[] b, final int off, final int len) {
-    if (eventLoop.inEventLoop()) {
-      buf.ensureWritable(len).writeBytes(b, off, len);
-    } else {
-      eventLoop.submit(new Runnable() {
-
-        @Override
-        public void run() {
-          buf.ensureWritable(len).writeBytes(b, off, len);
-        }
-      }).syncUninterruptibly();
-    }
-  }
-
-  /**
-   * Return the current size of buffered data.
-   */
-  public int buffered() {
-    if (eventLoop.inEventLoop()) {
-      return buf.readableBytes();
-    } else {
-      return eventLoop.submit(new Callable<Integer>() {
-
-        @Override
-        public Integer call() throws Exception {
-          return buf.readableBytes();
-        }
-      }).syncUninterruptibly().getNow().intValue();
-    }
-  }
-
-  public DatanodeInfo[] getPipeline() {
-    return locatedBlock.getLocations();
-  }
-
-  private <A> void flush0(final A attachment, final CompletionHandler<Long, ? super A> handler,
-      boolean syncBlock) {
-    if (state != State.STREAMING) {
-      handler.failed(new IOException("stream already broken"), attachment);
-      return;
-    }
-    int dataLen = buf.readableBytes();
-    final long ackedLength = nextPacketOffsetInBlock + dataLen;
-    if (ackedLength == locatedBlock.getBlock().getNumBytes()) {
-      // no new data, just return
-      handler.completed(locatedBlock.getBlock().getNumBytes(), attachment);
-      return;
-    }
-    Promise<Void> promise = eventLoop.newPromise();
-    promise.addListener(new FutureListener<Void>() {
-
-      @Override
-      public void operationComplete(Future<Void> future) throws Exception {
-        if (future.isSuccess()) {
-          locatedBlock.getBlock().setNumBytes(ackedLength);
-          handler.completed(ackedLength, attachment);
-        } else {
-          handler.failed(future.cause(), attachment);
-        }
-      }
-    });
-    Callback c = waitingAckQueue.peekLast();
-    if (c != null && ackedLength == c.ackedLength) {
-      // just append it to the tail of waiting ack queue,, do not issue new hflush request.
-      waitingAckQueue
-          .addLast(new Callback(promise, ackedLength, Collections.<Channel> emptyList()));
-      return;
-    }
-    int chunkLen = summer.getBytesPerChecksum();
-    int trailingPartialChunkLen = dataLen % chunkLen;
-    int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
-    int checksumLen = numChecks * summer.getChecksumSize();
-    ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
-    summer.calculateChunkedSums(buf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
-    checksumBuf.writerIndex(checksumLen);
-    PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
-        nextPacketSeqno, false, dataLen, syncBlock);
-    int headerLen = header.getSerializedSize();
-    ByteBuf headerBuf = alloc.buffer(headerLen);
-    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
-    headerBuf.writerIndex(headerLen);
-
-    waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList));
-    for (Channel ch : datanodeList) {
-      ch.write(headerBuf.duplicate().retain());
-      ch.write(checksumBuf.duplicate().retain());
-      ch.writeAndFlush(buf.duplicate().retain());
-    }
-    checksumBuf.release();
-    headerBuf.release();
-    ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen);
-    if (trailingPartialChunkLen != 0) {
-      buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen);
-    }
-    buf.release();
-    this.buf = newBuf;
-    nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen;
-    nextPacketSeqno++;
-  }
-
-  /**
-   * Flush the buffer out to datanodes.
-   * @param attachment will be passed to handler when completed.
-   * @param handler will set the acked length as result when completed.
-   * @param syncBlock will call hsync if true, otherwise hflush.
-   */
-  public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler,
-      final boolean syncBlock) {
-    if (eventLoop.inEventLoop()) {
-      flush0(attachment, handler, syncBlock);
-    } else {
-      eventLoop.execute(new Runnable() {
-
-        @Override
-        public void run() {
-          flush0(attachment, handler, syncBlock);
-        }
-      });
-    }
-  }
-
-  private void endBlock(Promise<Void> promise, long size) {
-    if (state != State.STREAMING) {
-      promise.tryFailure(new IOException("stream already broken"));
-      return;
-    }
-    if (!waitingAckQueue.isEmpty()) {
-      promise.tryFailure(new IllegalStateException("should call flush first before calling close"));
-      return;
-    }
-    state = State.CLOSING;
-    PacketHeader header = new PacketHeader(4, size, nextPacketSeqno, true, 0, false);
-    buf.release();
-    buf = null;
-    int headerLen = header.getSerializedSize();
-    ByteBuf headerBuf = alloc.buffer(headerLen);
-    header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
-    headerBuf.writerIndex(headerLen);
-    waitingAckQueue.add(new Callback(promise, size, datanodeList));
-    for (Channel ch : datanodeList) {
-      ch.writeAndFlush(headerBuf.duplicate().retain());
-    }
-    headerBuf.release();
-  }
-
-  /**
-   * The close method when error occurred. Now we just call recoverFileLease.
-   */
-  public void recoverAndClose(CancelableProgressable reporter) throws IOException {
-    assert !eventLoop.inEventLoop();
-    for (Channel ch : datanodeList) {
-      ch.closeFuture().awaitUninterruptibly();
-    }
-    endFileLease(client, src, fileId);
-    fsUtils.recoverFileLease(dfs, new Path(src), conf,
-      reporter == null ? new CancelOnClose(client) : reporter);
-  }
-
-  /**
-   * End the current block and complete file at namenode. You should call
-   * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
-   */
-  @Override
-  public void close() throws IOException {
-    assert !eventLoop.inEventLoop();
-    final Promise<Void> promise = eventLoop.newPromise();
-    eventLoop.execute(new Runnable() {
-
-      @Override
-      public void run() {
-        endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes());
-      }
-    });
-    promise.addListener(new FutureListener<Void>() {
-
-      @Override
-      public void operationComplete(Future<Void> future) throws Exception {
-        for (Channel ch : datanodeList) {
-          ch.close();
-        }
-      }
-    }).syncUninterruptibly();
-    for (Channel ch : datanodeList) {
-      ch.closeFuture().awaitUninterruptibly();
-    }
-    completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId);
-  }
-}


Mime
View raw message