hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [10/11] git commit: HDFS-6134 and HADOOP-10150 subtasks.
Date Thu, 28 Aug 2014 22:07:05 GMT
HDFS-6134 and HADOOP-10150 subtasks.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c77bd85b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c77bd85b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c77bd85b

Branch: refs/heads/branch-2
Commit: c77bd85b621e23738855628230bf8db1bc5d007d
Parents: 631dea8
Author: Alejandro Abdelnur <tucu@cloudera.com>
Authored: Tue Aug 26 10:38:28 2014 -0700
Committer: Alejandro Abdelnur <tucu@apache.org>
Committed: Thu Aug 28 15:03:08 2014 -0700

----------------------------------------------------------------------
 BUILDING.txt                                    |  21 +
 .../hadoop-common/CHANGES-fs-encryption.txt     |  61 ++
 hadoop-common-project/hadoop-common/pom.xml     |  19 +-
 .../hadoop-common/src/CMakeLists.txt            |  34 +
 .../hadoop-common/src/config.h.cmake            |   1 +
 .../apache/hadoop/crypto/AesCtrCryptoCodec.java |  67 ++
 .../org/apache/hadoop/crypto/CipherSuite.java   | 115 +++
 .../org/apache/hadoop/crypto/CryptoCodec.java   | 174 +++++
 .../apache/hadoop/crypto/CryptoInputStream.java | 680 +++++++++++++++++
 .../hadoop/crypto/CryptoOutputStream.java       | 286 +++++++
 .../apache/hadoop/crypto/CryptoStreamUtils.java |  70 ++
 .../org/apache/hadoop/crypto/Decryptor.java     |  72 ++
 .../org/apache/hadoop/crypto/Encryptor.java     |  71 ++
 .../hadoop/crypto/JceAesCtrCryptoCodec.java     | 165 ++++
 .../hadoop/crypto/OpensslAesCtrCryptoCodec.java | 164 ++++
 .../org/apache/hadoop/crypto/OpensslCipher.java | 287 +++++++
 .../crypto/random/OpensslSecureRandom.java      | 119 +++
 .../hadoop/crypto/random/OsSecureRandom.java    | 115 +++
 .../hadoop/fs/CommonConfigurationKeys.java      |   1 -
 .../fs/CommonConfigurationKeysPublic.java       |  30 +
 .../apache/hadoop/fs/FSDataOutputStream.java    |   2 +-
 .../apache/hadoop/fs/FileEncryptionInfo.java    | 102 +++
 .../fs/crypto/CryptoFSDataInputStream.java      |  37 +
 .../fs/crypto/CryptoFSDataOutputStream.java     |  47 ++
 .../hadoop/fs/shell/CommandWithDestination.java |  75 +-
 .../apache/hadoop/fs/shell/CopyCommands.java    |   6 +-
 .../apache/hadoop/util/NativeCodeLoader.java    |   5 +
 .../hadoop/util/NativeLibraryChecker.java       |  21 +-
 .../org/apache/hadoop/crypto/OpensslCipher.c    | 382 ++++++++++
 .../hadoop/crypto/org_apache_hadoop_crypto.h    |  61 ++
 .../hadoop/crypto/random/OpensslSecureRandom.c  | 335 ++++++++
 .../random/org_apache_hadoop_crypto_random.h    |  40 +
 .../org/apache/hadoop/util/NativeCodeLoader.c   |  10 +
 .../src/main/resources/core-default.xml         |  69 ++
 .../src/site/apt/FileSystemShell.apt.vm         |  11 +-
 .../hadoop/crypto/CryptoStreamsTestBase.java    | 721 ++++++++++++++++++
 .../apache/hadoop/crypto/TestCryptoCodec.java   | 186 +++++
 .../apache/hadoop/crypto/TestCryptoStreams.java | 381 ++++++++++
 .../crypto/TestCryptoStreamsForLocalFS.java     | 120 +++
 .../hadoop/crypto/TestCryptoStreamsNormal.java  | 123 +++
 ...yptoStreamsWithOpensslAesCtrCryptoCodec.java |  31 +
 .../apache/hadoop/crypto/TestOpensslCipher.java | 110 +++
 .../crypto/random/TestOpensslSecureRandom.java  | 114 +++
 .../crypto/random/TestOsSecureRandom.java       | 139 ++++
 .../hadoop/util/TestNativeCodeLoader.java       |   4 +
 .../src/test/resources/testConf.xml             |  18 +-
 .../hadoop-hdfs/CHANGES-fs-encryption.txt       | 102 +++
 hadoop-hdfs-project/hadoop-hdfs/pom.xml         |   1 +
 .../hadoop-hdfs/src/main/bin/hdfs               |   5 +-
 .../main/java/org/apache/hadoop/fs/Hdfs.java    |  20 +-
 .../main/java/org/apache/hadoop/fs/XAttr.java   |  13 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 158 +++-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |   8 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  32 +-
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  38 +
 .../hadoop/hdfs/DistributedFileSystem.java      |  52 +-
 .../hdfs/UnknownCipherSuiteException.java       |  38 +
 .../org/apache/hadoop/hdfs/XAttrHelper.java     |   8 +-
 .../apache/hadoop/hdfs/client/HdfsAdmin.java    |  50 ++
 .../hadoop/hdfs/client/HdfsDataInputStream.java |  38 +-
 .../hdfs/client/HdfsDataOutputStream.java       |  36 +-
 .../hadoop/hdfs/protocol/ClientProtocol.java    |  30 +-
 .../hadoop/hdfs/protocol/EncryptionZone.java    |  79 ++
 .../hdfs/protocol/EncryptionZoneIterator.java   |  51 ++
 .../hdfs/protocol/EncryptionZoneWithId.java     |  64 ++
 .../protocol/EncryptionZoneWithIdIterator.java  |  53 ++
 .../hadoop/hdfs/protocol/HdfsFileStatus.java    |  15 +-
 .../hdfs/protocol/HdfsLocatedFileStatus.java    |   8 +-
 .../hadoop/hdfs/protocol/LocatedBlocks.java     |  23 +-
 .../protocol/SnapshottableDirectoryStatus.java  |   2 +-
 ...tNamenodeProtocolServerSideTranslatorPB.java |  56 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  81 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 105 ++-
 .../server/blockmanagement/BlockManager.java    |   9 +-
 .../hdfs/server/common/HdfsServerConstants.java |   5 +
 .../namenode/EncryptionFaultInjector.java       |  22 +
 .../server/namenode/EncryptionZoneManager.java  | 296 ++++++++
 .../hdfs/server/namenode/FSDirectory.java       | 295 +++++++-
 .../hdfs/server/namenode/FSEditLogLoader.java   |   3 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 656 ++++++++++++----
 .../hdfs/server/namenode/NameNodeRpcServer.java |  25 +-
 .../namenode/RetryStartFileException.java       |  21 +
 .../server/namenode/XAttrPermissionFilter.java  |  30 +-
 .../apache/hadoop/hdfs/tools/CryptoAdmin.java   | 301 ++++++++
 .../org/apache/hadoop/hdfs/web/JsonUtil.java    |   5 +-
 .../src/main/proto/ClientNamenodeProtocol.proto |   8 +
 .../hadoop-hdfs/src/main/proto/encryption.proto |  65 ++
 .../hadoop-hdfs/src/main/proto/hdfs.proto       |  22 +-
 .../hadoop-hdfs/src/main/proto/xattr.proto      |   1 +
 .../src/main/resources/hdfs-default.xml         |   9 +
 .../src/site/apt/ExtendedAttributes.apt.vm      |   4 +-
 .../src/site/apt/TransparentEncryption.apt.vm   | 206 +++++
 .../apache/hadoop/cli/TestCryptoAdminCLI.java   | 169 +++++
 .../hadoop/cli/util/CLICommandCryptoAdmin.java  |  21 +
 .../hadoop/cli/util/CryptoAdminCmdExecutor.java |  37 +
 .../java/org/apache/hadoop/fs/TestXAttr.java    |  11 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  68 ++
 .../hadoop/hdfs/TestDFSClientRetries.java       |  11 +-
 .../org/apache/hadoop/hdfs/TestDFSShell.java    | 177 ++++-
 .../org/apache/hadoop/hdfs/TestDFSUtil.java     |   2 +-
 .../hadoop/hdfs/TestDistributedFileSystem.java  |   1 -
 .../apache/hadoop/hdfs/TestEncryptionZones.java | 756 +++++++++++++++++++
 .../apache/hadoop/hdfs/TestFileCreation.java    |   2 +-
 .../java/org/apache/hadoop/hdfs/TestLease.java  |   9 +-
 .../hadoop/hdfs/TestReservedRawPaths.java       | 350 +++++++++
 .../hdfs/crypto/TestHdfsCryptoStreams.java      |  91 +++
 .../hdfs/server/namenode/FSXAttrBaseTest.java   | 179 ++++-
 .../server/namenode/NNThroughputBenchmark.java  |   5 +-
 .../hdfs/server/namenode/TestAddBlockRetry.java |   4 +-
 .../hdfs/server/namenode/TestFSDirectory.java   |  15 +-
 .../hadoop/hdfs/server/namenode/TestFsck.java   |   2 +-
 .../server/namenode/TestNamenodeRetryCache.java |   9 +-
 .../namenode/ha/TestRetryCacheWithHA.java       |   2 +-
 .../apache/hadoop/hdfs/web/TestJsonUtil.java    |   2 +-
 .../src/test/resources/testCryptoConf.xml       | 284 +++++++
 .../src/test/resources/testXAttrConf.xml        |  58 +-
 .../CHANGES-fs-encryption.txt                   |  20 +
 .../org/apache/hadoop/mapred/BackupStore.java   |   6 +-
 .../java/org/apache/hadoop/mapred/IFile.java    |  21 +-
 .../java/org/apache/hadoop/mapred/MapTask.java  |  29 +-
 .../java/org/apache/hadoop/mapred/Merger.java   |  15 +-
 .../apache/hadoop/mapreduce/CryptoUtils.java    | 199 +++++
 .../apache/hadoop/mapreduce/JobSubmitter.java   |   9 +-
 .../apache/hadoop/mapreduce/MRJobConfig.java    |  13 +
 .../hadoop/mapreduce/task/reduce/Fetcher.java   |  12 +-
 .../mapreduce/task/reduce/LocalFetcher.java     |   4 +
 .../mapreduce/task/reduce/MergeManagerImpl.java |  32 +-
 .../mapreduce/task/reduce/OnDiskMapOutput.java  |   3 +-
 .../src/site/markdown/DistCp.md.vm              |  20 +
 .../mapreduce/task/reduce/TestMerger.java       | 146 +++-
 .../org/apache/hadoop/mapred/TestIFile.java     |  11 +-
 .../TestMRIntermediateDataEncryption.java       | 254 +++++++
 .../apache/hadoop/mapred/TestReduceTask.java    |   2 +-
 .../mapred/pipes/TestPipeApplication.java       |  12 +-
 hadoop-project-dist/pom.xml                     |  12 +
 hadoop-project/pom.xml                          |   2 +
 hadoop-project/src/site/site.xml                |   1 +
 .../apache/hadoop/tools/DistCpConstants.java    |   6 +
 .../apache/hadoop/tools/DistCpOptionSwitch.java |   6 +-
 .../org/apache/hadoop/tools/DistCpOptions.java  |  21 +-
 .../apache/hadoop/tools/SimpleCopyListing.java  |  48 +-
 .../hadoop/tools/mapred/CopyCommitter.java      |   9 +-
 .../apache/hadoop/tools/mapred/CopyMapper.java  |  10 +-
 .../apache/hadoop/tools/util/DistCpUtils.java   |  48 +-
 .../hadoop/tools/TestDistCpWithRawXAttrs.java   | 170 +++++
 .../hadoop/tools/TestDistCpWithXAttrs.java      |  71 +-
 .../apache/hadoop/tools/TestOptionsParser.java  |   3 +-
 .../hadoop/tools/util/DistCpTestUtils.java      |  89 +++
 .../hadoop/tools/util/TestDistCpUtils.java      |   6 +-
 150 files changed, 11931 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/BUILDING.txt
----------------------------------------------------------------------
diff --git a/BUILDING.txt b/BUILDING.txt
index 59eb447..3940a98 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -81,6 +81,27 @@ Maven build goals:
     the final tar file. This option requires that -Dsnappy.lib is also given,
     and it ignores the -Dsnappy.prefix option.
 
+ OpenSSL build options:
+
+   OpenSSL includes a crypto library that can be utilized by the native code.
+   It is currently an optional component, meaning that Hadoop can be built with
+   or without this dependency.
+
+  * Use -Drequire.openssl to fail the build if libcrypto.so is not found.
+    If this option is not specified and the openssl library is missing,
+    we silently build a version of libhadoop.so that cannot make use of
+    openssl. This option is recommended if you plan on making use of openssl 
+    and want to get more repeatable builds.
+  * Use -Dopenssl.prefix to specify a nonstandard location for the libcrypto
+    header files and library files. You do not need this option if you have
+    installed openssl using a package manager.
+  * Use -Dopenssl.lib to specify a nonstandard location for the libcrypto library
+    files. Similarly to openssl.prefix, you do not need this option if you have
+    installed openssl using a package manager.
+  * Use -Dbundle.openssl to copy the contents of the openssl.lib directory into
+    the final tar file. This option requires that -Dopenssl.lib is also given,
+    and it ignores the -Dopenssl.prefix option.
+
    Tests options:
 
   * Use -DskipTests to skip tests when running the following Maven goals:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt b/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt
new file mode 100644
index 0000000..d036e71
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/CHANGES-fs-encryption.txt
@@ -0,0 +1,61 @@
+Hadoop Common Change Log for HDFS-6134 and HADOOP-10150
+
+fs-encryption (Unreleased)
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+    HADOOP-10734. Implement high-performance secure random number sources.
+    (Yi Liu via Colin Patrick McCabe)
+
+  IMPROVEMENTS
+
+    HADOOP-10603. Crypto input and output streams implementing Hadoop stream
+    interfaces. (Yi Liu and Charles Lamb)
+
+    HADOOP-10628. Javadoc and few code style improvement for Crypto
+    input and output streams. (Yi Liu via clamb)
+
+    HADOOP-10632. Minor improvements to Crypto input and output streams. 
+    (Yi Liu)
+
+    HADOOP-10635. Add a method to CryptoCodec to generate SRNs for IV. (Yi Liu)
+
+    HADOOP-10653. Add a new constructor for CryptoInputStream that 
+    receives current position of wrapped stream. (Yi Liu)
+
+    HADOOP-10662. NullPointerException in CryptoInputStream while wrapped
+    stream is not ByteBufferReadable. Add tests using normal stream. (Yi Liu)
+
+    HADOOP-10713. Refactor CryptoCodec#generateSecureRandom to take a byte[]. 
+    (wang via yliu)
+
+    HADOOP-10693. Implementation of AES-CTR CryptoCodec using JNI to OpenSSL. 
+    (Yi Liu via cmccabe)
+
+    HADOOP-10803. Update OpensslCipher#getInstance to accept CipherSuite#name
+    format. (Yi Liu)
+
+    HADOOP-10735. Fall back AesCtrCryptoCodec implementation from OpenSSL to
+    JCE if non native support. (Yi Liu)
+
+    HADOOP-10870. Failed to load OpenSSL cipher error logs on systems with old
+    openssl versions (cmccabe)
+
+    HADOOP-10853. Refactor get instance of CryptoCodec and support create via
+    algorithm/mode/padding. (Yi Liu)
+
+    HADOOP-10919. Copy command should preserve raw.* namespace
+    extended attributes. (clamb)
+
+    HDFS-6873. Constants in CommandWithDestination should be static. (clamb)
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
+    HADOOP-10871. incorrect prototype in OpensslSecureRandom.c (cmccabe)
+
+    HADOOP-10886. CryptoCodec#getCodecclasses throws NPE when configurations not 
+    loaded. (umamahesh)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 1c6d2dd..cb6bafa 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -524,6 +524,10 @@
         <snappy.lib></snappy.lib>
         <snappy.include></snappy.include>
         <require.snappy>false</require.snappy>
+        <openssl.prefix></openssl.prefix>
+        <openssl.lib></openssl.lib>
+        <openssl.include></openssl.include>
+        <require.openssl>false</require.openssl>
       </properties>
       <build>
         <plugins>
@@ -573,6 +577,8 @@
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
+                    <javahClassName>org.apache.hadoop.crypto.OpensslCipher</javahClassName>
+                    <javahClassName>org.apache.hadoop.crypto.random.OpensslSecureRandom</javahClassName>
                     <javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
                     <javahClassName>org.apache.hadoop.net.unix.DomainSocket</javahClassName>
                     <javahClassName>org.apache.hadoop.net.unix.DomainSocketWatcher</javahClassName>
@@ -593,7 +599,7 @@
                 <configuration>
                   <target>
                     <exec executable="cmake" dir="${project.build.directory}/native" failonerror="true">
-                      <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DREQUIRE_BZIP2=${require.bzip2} -DREQUIRE_SNAPPY=${require.snappy} -DCUSTOM_SNAPPY_PREFIX=${snappy.prefix} -DCUSTOM_SNAPPY_LIB=${snappy.lib} -DCUSTOM_SNAPPY_INCLUDE=${snappy.include}"/>
+                      <arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DREQUIRE_BZIP2=${require.bzip2} -DREQUIRE_SNAPPY=${require.snappy} -DCUSTOM_SNAPPY_PREFIX=${snappy.prefix} -DCUSTOM_SNAPPY_LIB=${snappy.lib} -DCUSTOM_SNAPPY_INCLUDE=${snappy.include} -DREQUIRE_OPENSSL=${require.openssl} -DCUSTOM_OPENSSL_PREFIX=${openssl.prefix} -DCUSTOM_OPENSSL_LIB=${openssl.lib} -DCUSTOM_OPENSSL_INCLUDE=${openssl.include}"/>
                     </exec>
                     <exec executable="make" dir="${project.build.directory}/native" failonerror="true">
                       <arg line="VERBOSE=1"/>
@@ -637,6 +643,11 @@
         <snappy.include></snappy.include>
         <require.snappy>false</require.snappy>
         <bundle.snappy.in.bin>true</bundle.snappy.in.bin>
+        <openssl.prefix></openssl.prefix>
+        <openssl.lib></openssl.lib>
+        <openssl.include></openssl.include>
+        <require.openssl>false</require.openssl>
+        <bundle.openssl.in.bin>true</bundle.openssl.in.bin>
       </properties>
       <build>
         <plugins>
@@ -682,6 +693,8 @@
                     <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
                     <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
+                    <javahClassName>org.apache.hadoop.crypto.OpensslCipher</javahClassName>
+                    <javahClassName>org.apache.hadoop.crypto.random.OpensslSecureRandom</javahClassName>
                     <javahClassName>org.apache.hadoop.util.NativeCrc32</javahClassName>
                   </javahClassNames>
                   <javahOutputDirectory>${project.build.directory}/native/javah</javahOutputDirectory>
@@ -726,6 +739,10 @@
                     <argument>/p:CustomSnappyLib=${snappy.lib}</argument>
                     <argument>/p:CustomSnappyInclude=${snappy.include}</argument>
                     <argument>/p:RequireSnappy=${require.snappy}</argument>
+                    <argument>/p:CustomOpensslPrefix=${openssl.prefix}</argument>
+                    <argument>/p:CustomOpensslLib=${openssl.lib}</argument>
+                    <argument>/p:CustomOpensslInclude=${openssl.include}</argument>
+                    <argument>/p:RequireOpenssl=${require.openssl}</argument>
                   </arguments>
                 </configuration>
               </execution>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
index dec63c4..84c27e5 100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -145,6 +145,38 @@ else (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
     ENDIF(REQUIRE_SNAPPY)
 endif (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
 
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1.0.0")
+SET(OPENSSL_NAME "crypto")
+IF(${CMAKE_SYSTEM_NAME} MATCHES "Windows")
+    SET(OPENSSL_NAME "eay32")
+ENDIF()
+find_library(OPENSSL_LIBRARY
+    NAMES ${OPENSSL_NAME}
+    PATHS ${CUSTOM_OPENSSL_PREFIX} ${CUSTOM_OPENSSL_PREFIX}/lib
+          ${CUSTOM_OPENSSL_PREFIX}/lib64 ${CUSTOM_OPENSSL_LIB} NO_DEFAULT_PATH)
+find_library(OPENSSL_LIBRARY
+    NAMES ${OPENSSL_NAME})
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+find_path(OPENSSL_INCLUDE_DIR 
+    NAMES openssl/evp.h
+    PATHS ${CUSTOM_OPENSSL_PREFIX} ${CUSTOM_OPENSSL_PREFIX}/include
+          ${CUSTOM_OPENSSL_INCLUDE} NO_DEFAULT_PATH)
+find_path(OPENSSL_INCLUDE_DIR 
+    NAMES openssl/evp.h)
+if (OPENSSL_LIBRARY AND OPENSSL_INCLUDE_DIR)
+    GET_FILENAME_COMPONENT(HADOOP_OPENSSL_LIBRARY ${OPENSSL_LIBRARY} NAME)
+    SET(OPENSSL_SOURCE_FILES
+        "${D}/crypto/OpensslCipher.c"
+        "${D}/crypto/random/OpensslSecureRandom.c")
+else (OPENSSL_LIBRARY AND OPENSSL_INCLUDE_DIR)
+    SET(OPENSSL_INCLUDE_DIR "")
+    SET(OPENSSL_SOURCE_FILES "")
+    IF(REQUIRE_OPENSSL)
+        MESSAGE(FATAL_ERROR "Required openssl library could not be found.  OPENSSL_LIBRARY=${OPENSSL_LIBRARY}, OPENSSL_INCLUDE_DIR=${OPENSSL_INCLUDE_DIR}, CUSTOM_OPENSSL_INCLUDE_DIR=${CUSTOM_OPENSSL_INCLUDE_DIR}, CUSTOM_OPENSSL_PREFIX=${CUSTOM_OPENSSL_PREFIX}, CUSTOM_OPENSSL_INCLUDE=${CUSTOM_OPENSSL_INCLUDE}")
+    ENDIF(REQUIRE_OPENSSL)
+endif (OPENSSL_LIBRARY AND OPENSSL_INCLUDE_DIR)
+
 include_directories(
     ${GENERATED_JAVAH}
     main/native/src
@@ -155,6 +187,7 @@ include_directories(
     ${ZLIB_INCLUDE_DIRS}
     ${BZIP2_INCLUDE_DIR}
     ${SNAPPY_INCLUDE_DIR}
+    ${OPENSSL_INCLUDE_DIR}
     ${D}/util
 )
 CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
@@ -172,6 +205,7 @@ add_dual_library(hadoop
     ${D}/io/compress/lz4/lz4.c
     ${D}/io/compress/lz4/lz4hc.c
     ${SNAPPY_SOURCE_FILES}
+    ${OPENSSL_SOURCE_FILES}
     ${D}/io/compress/zlib/ZlibCompressor.c
     ${D}/io/compress/zlib/ZlibDecompressor.c
     ${BZIP2_SOURCE_FILES}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/config.h.cmake
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake
index 020017c..d71271d 100644
--- a/hadoop-common-project/hadoop-common/src/config.h.cmake
+++ b/hadoop-common-project/hadoop-common/src/config.h.cmake
@@ -21,6 +21,7 @@
 #cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@"
 #cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@"
 #cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@"
+#cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
 #cmakedefine HAVE_SYNC_FILE_RANGE
 #cmakedefine HAVE_POSIX_FADVISE
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AesCtrCryptoCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AesCtrCryptoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AesCtrCryptoCodec.java
new file mode 100644
index 0000000..8f8bc66
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/AesCtrCryptoCodec.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class AesCtrCryptoCodec extends CryptoCodec {
+
+  protected static final CipherSuite SUITE = CipherSuite.AES_CTR_NOPADDING;
+
+  /**
+   * For AES, the algorithm block is fixed size of 128 bits.
+   * @see http://en.wikipedia.org/wiki/Advanced_Encryption_Standard
+   */
+  private static final int AES_BLOCK_SIZE = SUITE.getAlgorithmBlockSize();
+  private static final int CTR_OFFSET = 8;
+
+  @Override
+  public CipherSuite getCipherSuite() {
+    return SUITE;
+  }
+  
+  /**
+   * The IV is produced by adding the initial IV to the counter. IV length 
+   * should be the same as {@link #AES_BLOCK_SIZE}
+   */
+  @Override
+  public void calculateIV(byte[] initIV, long counter, byte[] IV) {
+    Preconditions.checkArgument(initIV.length == AES_BLOCK_SIZE);
+    Preconditions.checkArgument(IV.length == AES_BLOCK_SIZE);
+    
+    System.arraycopy(initIV, 0, IV, 0, CTR_OFFSET);
+    long l = 0;
+    for (int i = 0; i < 8; i++) {
+      l = ((l << 8) | (initIV[CTR_OFFSET + i] & 0xff));
+    }
+    l += counter;
+    IV[CTR_OFFSET + 0] = (byte) (l >>> 56);
+    IV[CTR_OFFSET + 1] = (byte) (l >>> 48);
+    IV[CTR_OFFSET + 2] = (byte) (l >>> 40);
+    IV[CTR_OFFSET + 3] = (byte) (l >>> 32);
+    IV[CTR_OFFSET + 4] = (byte) (l >>> 24);
+    IV[CTR_OFFSET + 5] = (byte) (l >>> 16);
+    IV[CTR_OFFSET + 6] = (byte) (l >>> 8);
+    IV[CTR_OFFSET + 7] = (byte) (l);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherSuite.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherSuite.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherSuite.java
new file mode 100644
index 0000000..9962b38
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CipherSuite.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.crypto;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Defines properties of a CipherSuite. Modeled after the ciphers in
+ * {@link javax.crypto.Cipher}.
+ */
+@InterfaceAudience.Private
+public enum CipherSuite {
+  UNKNOWN("Unknown", 0),
+  AES_CTR_NOPADDING("AES/CTR/NoPadding", 16);
+
+  private final String name;
+  private final int algoBlockSize;
+
+  private Integer unknownValue = null;
+
+  CipherSuite(String name, int algoBlockSize) {
+    this.name = name;
+    this.algoBlockSize = algoBlockSize;
+  }
+
+  public void setUnknownValue(int unknown) {
+    this.unknownValue = unknown;
+  }
+
+  public int getUnknownValue() {
+    return unknownValue;
+  }
+
+  /**
+   * @return name of cipher suite, as in {@link javax.crypto.Cipher}
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * @return size of an algorithm block in bytes
+   */
+  public int getAlgorithmBlockSize() {
+    return algoBlockSize;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("{");
+    builder.append("name: " + name);
+    builder.append(", algorithmBlockSize: " + algoBlockSize);
+    if (unknownValue != null) {
+      builder.append(", unknownValue: " + unknownValue);
+    }
+    builder.append("}");
+    return builder.toString();
+  }
+  
+  public static void checkName(String name) {
+    CipherSuite[] suites = CipherSuite.values();
+    for (CipherSuite suite : suites) {
+      if (suite.getName().equals(name)) {
+        return;
+      }
+    }
+    throw new IllegalArgumentException("Invalid cipher suite name: " + name);
+  }
+  
+  /**
+   * Convert to CipherSuite from name, {@link #algoBlockSize} is fixed for
+   * certain cipher suite, just need to compare the name.
+   * @param name cipher suite name
+   * @return CipherSuite cipher suite
+   */
+  public static CipherSuite convert(String name) {
+    CipherSuite[] suites = CipherSuite.values();
+    for (CipherSuite suite : suites) {
+      if (suite.getName().equals(name)) {
+        return suite;
+      }
+    }
+    throw new IllegalArgumentException("Invalid cipher suite name: " + name);
+  }
+  
+  /**
+   * Returns suffix of cipher suite configuration.
+   * @return String configuration suffix
+   */
+  public String getConfigSuffix() {
+    String[] parts = name.split("/");
+    StringBuilder suffix = new StringBuilder();
+    for (String part : parts) {
+      suffix.append(".").append(part.toLowerCase());
+    }
+    
+    return suffix.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java
new file mode 100644
index 0000000..9de7f95
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoCodec.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.security.GeneralSecurityException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CIPHER_SUITE_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT;
+
+/**
+ * Crypto codec class, encapsulates encryptor/decryptor pair.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class CryptoCodec implements Configurable {
+  public static Logger LOG = LoggerFactory.getLogger(CryptoCodec.class);
+  
+  /**
+   * Get crypto codec for specified algorithm/mode/padding.
+   * 
+   * @param conf
+   *          the configuration
+   * @param CipherSuite
+   *          algorithm/mode/padding
+   * @return CryptoCodec the codec object. Null value will be returned if no
+   *         crypto codec classes with cipher suite configured.
+   */
+  public static CryptoCodec getInstance(Configuration conf, 
+      CipherSuite cipherSuite) {
+    List<Class<? extends CryptoCodec>> klasses = getCodecClasses(
+        conf, cipherSuite);
+    if (klasses == null) {
+      return null;
+    }
+    CryptoCodec codec = null;
+    for (Class<? extends CryptoCodec> klass : klasses) {
+      try {
+        CryptoCodec c = ReflectionUtils.newInstance(klass, conf);
+        if (c.getCipherSuite().getName().equals(cipherSuite.getName())) {
+          if (codec == null) {
+            LOG.debug("Using crypto codec {}.", klass.getName());
+            codec = c;
+          }
+        } else {
+          LOG.warn("Crypto codec {} doesn't meet the cipher suite {}.", 
+              klass.getName(), cipherSuite.getName());
+        }
+      } catch (Exception e) {
+        LOG.warn("Crypto codec {} is not available.", klass.getName());
+      }
+    }
+    
+    if (codec != null) {
+      return codec;
+    }
+    
+    throw new RuntimeException("No available crypto codec which meets " + 
+        "the cipher suite " + cipherSuite.getName() + ".");
+  }
+  
+  /**
+   * Get crypto codec for algorithm/mode/padding in config value
+   * hadoop.security.crypto.cipher.suite
+   * 
+   * @param conf
+   *          the configuration
+   * @return CryptoCodec the codec object Null value will be returned if no
+   *         crypto codec classes with cipher suite configured.
+   */
+  public static CryptoCodec getInstance(Configuration conf) {
+    String name = conf.get(HADOOP_SECURITY_CRYPTO_CIPHER_SUITE_KEY, 
+        HADOOP_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT);
+    return getInstance(conf, CipherSuite.convert(name));
+  }
+  
+  private static List<Class<? extends CryptoCodec>> getCodecClasses(
+      Configuration conf, CipherSuite cipherSuite) {
+    List<Class<? extends CryptoCodec>> result = Lists.newArrayList();
+    String configName = HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX + 
+        cipherSuite.getConfigSuffix();
+    String codecString = conf.get(configName);
+    if (codecString == null) {
+      LOG.warn("No crypto codec classes with cipher suite configured.");
+      return null;
+    }
+    for (String c : Splitter.on(',').trimResults().omitEmptyStrings().
+        split(codecString)) {
+      try {
+        Class<?> cls = conf.getClassByName(c);
+        result.add(cls.asSubclass(CryptoCodec.class));
+      } catch (ClassCastException e) {
+        LOG.warn("Class " + c + " is not a CryptoCodec.");
+      } catch (ClassNotFoundException e) {
+        LOG.warn("Crypto codec " + c + " not found.");
+      }
+    }
+    
+    return result;
+  }
+
+  /**
+   * @return the CipherSuite for this codec.
+   */
+  public abstract CipherSuite getCipherSuite();
+
+  /**
+   * Create a {@link org.apache.hadoop.crypto.Encryptor}. 
+   * @return Encryptor the encryptor
+   */
+  public abstract Encryptor createEncryptor() throws GeneralSecurityException;
+  
+  /**
+   * Create a {@link org.apache.hadoop.crypto.Decryptor}.
+   * @return Decryptor the decryptor
+   */
+  public abstract Decryptor createDecryptor() throws GeneralSecurityException;
+  
+  /**
+   * This interface is only for Counter (CTR) mode. Generally the Encryptor
+   * or Decryptor calculates the IV and maintain encryption context internally. 
+   * For example a {@link javax.crypto.Cipher} will maintain its encryption 
+   * context internally when we do encryption/decryption using the 
+   * Cipher#update interface. 
+   * <p/>
+   * Encryption/Decryption is not always on the entire file. For example,
+   * in Hadoop, a node may only decrypt a portion of a file (i.e. a split).
+   * In these situations, the counter is derived from the file position.
+   * <p/>
+   * The IV can be calculated by combining the initial IV and the counter with 
+   * a lossless operation (concatenation, addition, or XOR).
+   * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29
+   * 
+   * @param initIV initial IV
+   * @param counter counter for input stream position 
+   * @param IV the IV for input stream position
+   */
+  public abstract void calculateIV(byte[] initIV, long counter, byte[] IV);
+  
+  /**
+   * Generate a number of secure, random bytes suitable for cryptographic use.
+   * This method needs to be thread-safe.
+   *
+   * @param bytes byte array to populate with random data
+   */
+  public abstract void generateSecureRandom(byte[] bytes);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
new file mode 100644
index 0000000..e8964ed
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -0,0 +1,680 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.EnumSet;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.HasFileDescriptor;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
+ * required in order to ensure that the plain text and cipher text have a 1:1
+ * mapping. The decryption is buffer based. The key points of the decryption
+ * are (1) calculating the counter and (2) padding through stream position:
+ * <p/>
+ * counter = base + pos/(algorithm blocksize); 
+ * padding = pos%(algorithm blocksize); 
+ * <p/>
+ * The underlying stream offset is maintained as state.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class CryptoInputStream extends FilterInputStream implements 
+    Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 
+    CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
+  private static final byte[] oneByteBuf = new byte[1];
+  private final CryptoCodec codec;
+  private final Decryptor decryptor;
+  private final int bufferSize;
+  
+  /**
+   * Input data buffer. The data starts at inBuffer.position() and ends at 
+   * to inBuffer.limit().
+   */
+  private ByteBuffer inBuffer;
+  
+  /**
+   * The decrypted data buffer. The data starts at outBuffer.position() and 
+   * ends at outBuffer.limit();
+   */
+  private ByteBuffer outBuffer;
+  private long streamOffset = 0; // Underlying stream offset.
+  
+  /**
+   * Whether the underlying stream supports 
+   * {@link org.apache.hadoop.fs.ByteBufferReadable}
+   */
+  private Boolean usingByteBufferRead = null;
+  
+  /**
+   * Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer} 
+   * before any other data goes in. The purpose of padding is to put the input 
+   * data at proper position.
+   */
+  private byte padding;
+  private boolean closed;
+  private final byte[] key;
+  private final byte[] initIV;
+  private byte[] iv;
+  
+  /** DirectBuffer pool */
+  private final Queue<ByteBuffer> bufferPool = 
+      new ConcurrentLinkedQueue<ByteBuffer>();
+  /** Decryptor pool */
+  private final Queue<Decryptor> decryptorPool = 
+      new ConcurrentLinkedQueue<Decryptor>();
+  
+  public CryptoInputStream(InputStream in, CryptoCodec codec, 
+      int bufferSize, byte[] key, byte[] iv) throws IOException {
+    this(in, codec, bufferSize, key, iv, 
+        CryptoStreamUtils.getInputStreamOffset(in));
+  }
+  
+  public CryptoInputStream(InputStream in, CryptoCodec codec,
+      int bufferSize, byte[] key, byte[] iv, long streamOffset) throws IOException {
+    super(in);
+    this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize);
+    this.codec = codec;
+    this.key = key.clone();
+    this.initIV = iv.clone();
+    this.iv = iv.clone();
+    this.streamOffset = streamOffset;
+    inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
+    outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
+    decryptor = getDecryptor();
+    resetStreamOffset(streamOffset);
+  }
+  
+  public CryptoInputStream(InputStream in, CryptoCodec codec,
+      byte[] key, byte[] iv) throws IOException {
+    this(in, codec, CryptoStreamUtils.getBufferSize(codec.getConf()), key, iv);
+  }
+  
+  public InputStream getWrappedStream() {
+    return in;
+  }
+  
+  /**
+   * Decryption is buffer based.
+   * If there is data in {@link #outBuffer}, then read it out of this buffer.
+   * If there is no data in {@link #outBuffer}, then read more from the 
+   * underlying stream and do the decryption.
+   * @param b the buffer into which the decrypted data is read.
+   * @param off the buffer offset.
+   * @param len the maximum number of decrypted data bytes to read.
+   * @return int the total number of decrypted data bytes read into the buffer.
+   * @throws IOException
+   */
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    checkStream();
+    if (b == null) {
+      throw new NullPointerException();
+    } else if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return 0;
+    }
+    
+    final int remaining = outBuffer.remaining();
+    if (remaining > 0) {
+      int n = Math.min(len, remaining);
+      outBuffer.get(b, off, n);
+      return n;
+    } else {
+      int n = 0;
+      
+      /*
+       * Check whether the underlying stream is {@link ByteBufferReadable},
+       * it can avoid bytes copy.
+       */
+      if (usingByteBufferRead == null) {
+        if (in instanceof ByteBufferReadable) {
+          try {
+            n = ((ByteBufferReadable) in).read(inBuffer);
+            usingByteBufferRead = Boolean.TRUE;
+          } catch (UnsupportedOperationException e) {
+            usingByteBufferRead = Boolean.FALSE;
+          }
+        } else {
+          usingByteBufferRead = Boolean.FALSE;
+        }
+        if (!usingByteBufferRead) {
+          n = readFromUnderlyingStream(inBuffer);
+        }
+      } else {
+        if (usingByteBufferRead) {
+          n = ((ByteBufferReadable) in).read(inBuffer);
+        } else {
+          n = readFromUnderlyingStream(inBuffer);
+        }
+      }
+      if (n <= 0) {
+        return n;
+      }
+      
+      streamOffset += n; // Read n bytes
+      decrypt(decryptor, inBuffer, outBuffer, padding);
+      padding = afterDecryption(decryptor, inBuffer, streamOffset, iv);
+      n = Math.min(len, outBuffer.remaining());
+      outBuffer.get(b, off, n);
+      return n;
+    }
+  }
+  
+  /** Read data from underlying stream. */
+  private int readFromUnderlyingStream(ByteBuffer inBuffer) throws IOException {
+    final int toRead = inBuffer.remaining();
+    final byte[] tmp = getTmpBuf();
+    final int n = in.read(tmp, 0, toRead);
+    if (n > 0) {
+      inBuffer.put(tmp, 0, n);
+    }
+    return n;
+  }
+  
+  private byte[] tmpBuf;
+  private byte[] getTmpBuf() {
+    if (tmpBuf == null) {
+      tmpBuf = new byte[bufferSize];
+    }
+    return tmpBuf;
+  }
+  
+  /**
+   * Do the decryption using inBuffer as input and outBuffer as output.
+   * Upon return, inBuffer is cleared; the decrypted data starts at 
+   * outBuffer.position() and ends at outBuffer.limit();
+   */
+  private void decrypt(Decryptor decryptor, ByteBuffer inBuffer, 
+      ByteBuffer outBuffer, byte padding) throws IOException {
+    Preconditions.checkState(inBuffer.position() >= padding);
+    if(inBuffer.position() == padding) {
+      // There is no real data in inBuffer.
+      return;
+    }
+    inBuffer.flip();
+    outBuffer.clear();
+    decryptor.decrypt(inBuffer, outBuffer);
+    inBuffer.clear();
+    outBuffer.flip();
+    if (padding > 0) {
+      /*
+       * The plain text and cipher text have a 1:1 mapping, they start at the 
+       * same position.
+       */
+      outBuffer.position(padding);
+    }
+  }
+  
+  /**
+   * This method is executed immediately after decryption. Check whether 
+   * decryptor should be updated and recalculate padding if needed. 
+   */
+  private byte afterDecryption(Decryptor decryptor, ByteBuffer inBuffer, 
+      long position, byte[] iv) throws IOException {
+    byte padding = 0;
+    if (decryptor.isContextReset()) {
+      /*
+       * This code is generally not executed since the decryptor usually 
+       * maintains decryption context (e.g. the counter) internally. However, 
+       * some implementations can't maintain context so a re-init is necessary 
+       * after each decryption call.
+       */
+      updateDecryptor(decryptor, position, iv);
+      padding = getPadding(position);
+      inBuffer.position(padding);
+    }
+    return padding;
+  }
+  
+  private long getCounter(long position) {
+    return position / codec.getCipherSuite().getAlgorithmBlockSize();
+  }
+  
+  private byte getPadding(long position) {
+    return (byte)(position % codec.getCipherSuite().getAlgorithmBlockSize());
+  }
+  
+  /** Calculate the counter and iv, update the decryptor. */
+  private void updateDecryptor(Decryptor decryptor, long position, byte[] iv) 
+      throws IOException {
+    final long counter = getCounter(position);
+    codec.calculateIV(initIV, counter, iv);
+    decryptor.init(key, iv);
+  }
+  
+  /**
+   * Reset the underlying stream offset; clear {@link #inBuffer} and 
+   * {@link #outBuffer}. This Typically happens during {@link #seek(long)} 
+   * or {@link #skip(long)}.
+   */
+  private void resetStreamOffset(long offset) throws IOException {
+    streamOffset = offset;
+    inBuffer.clear();
+    outBuffer.clear();
+    outBuffer.limit(0);
+    updateDecryptor(decryptor, offset, iv);
+    padding = getPadding(offset);
+    inBuffer.position(padding); // Set proper position for input data.
+  }
+  
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    
+    super.close();
+    freeBuffers();
+    closed = true;
+  }
+  
+  /** Positioned read. It is thread-safe */
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    checkStream();
+    try {
+      final int n = ((PositionedReadable) in).read(position, buffer, offset, 
+          length);
+      if (n > 0) {
+        // This operation does not change the current offset of the file
+        decrypt(position, buffer, offset, n);
+      }
+      
+      return n;
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("This stream does not support " +
+          "positioned read.");
+    }
+  }
+  
+  /**
+   * Decrypt length bytes in buffer starting at offset. Output is also put 
+   * into buffer starting at offset. It is thread-safe.
+   */
+  private void decrypt(long position, byte[] buffer, int offset, int length) 
+      throws IOException {
+    ByteBuffer inBuffer = getBuffer();
+    ByteBuffer outBuffer = getBuffer();
+    Decryptor decryptor = null;
+    try {
+      decryptor = getDecryptor();
+      byte[] iv = initIV.clone();
+      updateDecryptor(decryptor, position, iv);
+      byte padding = getPadding(position);
+      inBuffer.position(padding); // Set proper position for input data.
+      
+      int n = 0;
+      while (n < length) {
+        int toDecrypt = Math.min(length - n, inBuffer.remaining());
+        inBuffer.put(buffer, offset + n, toDecrypt);
+        // Do decryption
+        decrypt(decryptor, inBuffer, outBuffer, padding);
+        
+        outBuffer.get(buffer, offset + n, toDecrypt);
+        n += toDecrypt;
+        padding = afterDecryption(decryptor, inBuffer, position + n, iv);
+      }
+    } finally {
+      returnBuffer(inBuffer);
+      returnBuffer(outBuffer);
+      returnDecryptor(decryptor);
+    }
+  }
+  
+  /** Positioned read fully. It is thread-safe */
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    checkStream();
+    try {
+      ((PositionedReadable) in).readFully(position, buffer, offset, length);
+      if (length > 0) {
+        // This operation does not change the current offset of the file
+        decrypt(position, buffer, offset, length);
+      }
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("This stream does not support " +
+          "positioned readFully.");
+    }
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer) throws IOException {
+    readFully(position, buffer, 0, buffer.length);
+  }
+
+  /** Seek to a position. */
+  @Override
+  public void seek(long pos) throws IOException {
+    Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
+    checkStream();
+    try {
+      /*
+       * If data of target pos in the underlying stream has already been read 
+       * and decrypted in outBuffer, we just need to re-position outBuffer.
+       */
+      if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
+        int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
+        if (forward > 0) {
+          outBuffer.position(outBuffer.position() + forward);
+        }
+      } else {
+        ((Seekable) in).seek(pos);
+        resetStreamOffset(pos);
+      }
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("This stream does not support " +
+          "seek.");
+    }
+  }
+  
+  /** Skip n bytes */
+  @Override
+  public long skip(long n) throws IOException {
+    Preconditions.checkArgument(n >= 0, "Negative skip length.");
+    checkStream();
+    
+    if (n == 0) {
+      return 0;
+    } else if (n <= outBuffer.remaining()) {
+      int pos = outBuffer.position() + (int) n;
+      outBuffer.position(pos);
+      return n;
+    } else {
+      /*
+       * Subtract outBuffer.remaining() to see how many bytes we need to 
+       * skip in the underlying stream. Add outBuffer.remaining() to the 
+       * actual number of skipped bytes in the underlying stream to get the 
+       * number of skipped bytes from the user's point of view.
+       */
+      n -= outBuffer.remaining();
+      long skipped = in.skip(n);
+      if (skipped < 0) {
+        skipped = 0;
+      }
+      long pos = streamOffset + skipped;
+      skipped += outBuffer.remaining();
+      resetStreamOffset(pos);
+      return skipped;
+    }
+  }
+
+  /** Get underlying stream position. */
+  @Override
+  public long getPos() throws IOException {
+    checkStream();
+    // Equals: ((Seekable) in).getPos() - outBuffer.remaining()
+    return streamOffset - outBuffer.remaining();
+  }
+  
+  /** ByteBuffer read. */
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    checkStream();
+    if (in instanceof ByteBufferReadable) {
+      final int unread = outBuffer.remaining();
+      if (unread > 0) { // Have unread decrypted data in buffer.
+        int toRead = buf.remaining();
+        if (toRead <= unread) {
+          final int limit = outBuffer.limit();
+          outBuffer.limit(outBuffer.position() + toRead);
+          buf.put(outBuffer);
+          outBuffer.limit(limit);
+          return toRead;
+        } else {
+          buf.put(outBuffer);
+        }
+      }
+      
+      final int pos = buf.position();
+      final int n = ((ByteBufferReadable) in).read(buf);
+      if (n > 0) {
+        streamOffset += n; // Read n bytes
+        decrypt(buf, n, pos);
+      }
+      return n;
+    }
+
+    throw new UnsupportedOperationException("ByteBuffer read unsupported " +
+        "by input stream.");
+  }
+  
+  /**
+   * Decrypt all data in buf: total n bytes from given start position.
+   * Output is also buf and same start position.
+   * buf.position() and buf.limit() should be unchanged after decryption.
+   */
+  private void decrypt(ByteBuffer buf, int n, int start) 
+      throws IOException {
+    final int pos = buf.position();
+    final int limit = buf.limit();
+    int len = 0;
+    while (len < n) {
+      buf.position(start + len);
+      buf.limit(start + len + Math.min(n - len, inBuffer.remaining()));
+      inBuffer.put(buf);
+      // Do decryption
+      try {
+        decrypt(decryptor, inBuffer, outBuffer, padding);
+        buf.position(start + len);
+        buf.limit(limit);
+        len += outBuffer.remaining();
+        buf.put(outBuffer);
+      } finally {
+        padding = afterDecryption(decryptor, inBuffer, streamOffset - (n - len), iv);
+      }
+    }
+    buf.position(pos);
+  }
+  
+  @Override
+  public int available() throws IOException {
+    checkStream();
+    
+    return in.available() + outBuffer.remaining();
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+  
+  @Override
+  public void mark(int readLimit) {
+  }
+  
+  @Override
+  public void reset() throws IOException {
+    throw new IOException("Mark/reset not supported");
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    Preconditions.checkArgument(targetPos >= 0, 
+        "Cannot seek to negative offset.");
+    checkStream();
+    try {
+      boolean result = ((Seekable) in).seekToNewSource(targetPos);
+      resetStreamOffset(targetPos);
+      return result;
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("This stream does not support " +
+          "seekToNewSource.");
+    }
+  }
+
+  @Override
+  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
+      EnumSet<ReadOption> opts) throws IOException,
+      UnsupportedOperationException {
+    checkStream();
+    try {
+      if (outBuffer.remaining() > 0) {
+        // Have some decrypted data unread, need to reset.
+        ((Seekable) in).seek(getPos());
+        resetStreamOffset(getPos());
+      }
+      final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
+          read(bufferPool, maxLength, opts);
+      if (buffer != null) {
+        final int n = buffer.remaining();
+        if (n > 0) {
+          streamOffset += buffer.remaining(); // Read n bytes
+          final int pos = buffer.position();
+          decrypt(buffer, n, pos);
+        }
+      }
+      return buffer;
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("This stream does not support " + 
+          "enhanced byte buffer access.");
+    }
+  }
+
+  @Override
+  public void releaseBuffer(ByteBuffer buffer) {
+    try {
+      ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("This stream does not support " + 
+          "release buffer.");
+    }
+  }
+
+  @Override
+  public void setReadahead(Long readahead) throws IOException,
+      UnsupportedOperationException {
+    try {
+      ((CanSetReadahead) in).setReadahead(readahead);
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("This stream does not support " +
+          "setting the readahead caching strategy.");
+    }
+  }
+
+  @Override
+  public void setDropBehind(Boolean dropCache) throws IOException,
+      UnsupportedOperationException {
+    try {
+      ((CanSetDropBehind) in).setDropBehind(dropCache);
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("This stream does not " +
+          "support setting the drop-behind caching setting.");
+    }
+  }
+
+  @Override
+  public FileDescriptor getFileDescriptor() throws IOException {
+    if (in instanceof HasFileDescriptor) {
+      return ((HasFileDescriptor) in).getFileDescriptor();
+    } else if (in instanceof FileInputStream) {
+      return ((FileInputStream) in).getFD();
+    } else {
+      return null;
+    }
+  }
+  
+  @Override
+  public int read() throws IOException {
+    return (read(oneByteBuf, 0, 1) == -1) ? -1 : (oneByteBuf[0] & 0xff);
+  }
+  
+  private void checkStream() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+  }
+  
+  /** Get direct buffer from pool */
+  private ByteBuffer getBuffer() {
+    ByteBuffer buffer = bufferPool.poll();
+    if (buffer == null) {
+      buffer = ByteBuffer.allocateDirect(bufferSize);
+    }
+    
+    return buffer;
+  }
+  
+  /** Return direct buffer to pool */
+  private void returnBuffer(ByteBuffer buf) {
+    if (buf != null) {
+      buf.clear();
+      bufferPool.add(buf);
+    }
+  }
+  
+  /** Forcibly free the direct buffers. */
+  private void freeBuffers() {
+    CryptoStreamUtils.freeDB(inBuffer);
+    CryptoStreamUtils.freeDB(outBuffer);
+    cleanBufferPool();
+  }
+  
+  /** Clean direct buffer pool */
+  private void cleanBufferPool() {
+    ByteBuffer buf;
+    while ((buf = bufferPool.poll()) != null) {
+      CryptoStreamUtils.freeDB(buf);
+    }
+  }
+  
+  /** Get decryptor from pool */
+  private Decryptor getDecryptor() throws IOException {
+    Decryptor decryptor = decryptorPool.poll();
+    if (decryptor == null) {
+      try {
+        decryptor = codec.createDecryptor();
+      } catch (GeneralSecurityException e) {
+        throw new IOException(e);
+      }
+    }
+    
+    return decryptor;
+  }
+  
+  /** Return decryptor to pool */
+  private void returnDecryptor(Decryptor decryptor) {
+    if (decryptor != null) {
+      decryptorPool.add(decryptor);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
new file mode 100644
index 0000000..fc07923
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.Syncable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
+ * required in order to ensure that the plain text and cipher text have a 1:1
+ * mapping. The encryption is buffer based. The key points of the encryption are
+ * (1) calculating counter and (2) padding through stream position.
+ * <p/>
+ * counter = base + pos/(algorithm blocksize); 
+ * padding = pos%(algorithm blocksize); 
+ * <p/>
+ * The underlying stream offset is maintained as state.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class CryptoOutputStream extends FilterOutputStream implements 
+    Syncable, CanSetDropBehind {
+  private static final byte[] oneByteBuf = new byte[1];
+  private final CryptoCodec codec;
+  private final Encryptor encryptor;
+  private final int bufferSize;
+  
+  /**
+   * Input data buffer. The data starts at inBuffer.position() and ends at 
+   * inBuffer.limit().
+   */
+  private ByteBuffer inBuffer;
+  
+  /**
+   * Encrypted data buffer. The data starts at outBuffer.position() and ends at 
+   * outBuffer.limit();
+   */
+  private ByteBuffer outBuffer;
+  private long streamOffset = 0; // Underlying stream offset.
+  
+  /**
+   * Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer} 
+   * before any other data goes in. The purpose of padding is to put input data
+   * at proper position.
+   */
+  private byte padding;
+  private boolean closed;
+  private final byte[] key;
+  private final byte[] initIV;
+  private byte[] iv;
+  
+  public CryptoOutputStream(OutputStream out, CryptoCodec codec, 
+      int bufferSize, byte[] key, byte[] iv) throws IOException {
+    this(out, codec, bufferSize, key, iv, 0);
+  }
+  
+  public CryptoOutputStream(OutputStream out, CryptoCodec codec, 
+      int bufferSize, byte[] key, byte[] iv, long streamOffset) 
+      throws IOException {
+    super(out);
+    this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize);
+    this.codec = codec;
+    this.key = key.clone();
+    this.initIV = iv.clone();
+    this.iv = iv.clone();
+    inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
+    outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
+    this.streamOffset = streamOffset;
+    try {
+      encryptor = codec.createEncryptor();
+    } catch (GeneralSecurityException e) {
+      throw new IOException(e);
+    }
+    updateEncryptor();
+  }
+  
+  public CryptoOutputStream(OutputStream out, CryptoCodec codec, 
+      byte[] key, byte[] iv) throws IOException {
+    this(out, codec, key, iv, 0);
+  }
+  
+  public CryptoOutputStream(OutputStream out, CryptoCodec codec, 
+      byte[] key, byte[] iv, long streamOffset) throws IOException {
+    this(out, codec, CryptoStreamUtils.getBufferSize(codec.getConf()), 
+        key, iv, streamOffset);
+  }
+  
+  public OutputStream getWrappedStream() {
+    return out;
+  }
+  
+  /**
+   * Encryption is buffer based.
+   * If there is enough room in {@link #inBuffer}, then write to this buffer.
+   * If {@link #inBuffer} is full, then do encryption and write data to the
+   * underlying stream.
+   * @param b the data.
+   * @param off the start offset in the data.
+   * @param len the number of bytes to write.
+   * @throws IOException
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkStream();
+    if (b == null) {
+      throw new NullPointerException();
+    } else if (off < 0 || len < 0 || off > b.length || 
+        len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    while (len > 0) {
+      final int remaining = inBuffer.remaining();
+      if (len < remaining) {
+        inBuffer.put(b, off, len);
+        len = 0;
+      } else {
+        inBuffer.put(b, off, remaining);
+        off += remaining;
+        len -= remaining;
+        encrypt();
+      }
+    }
+  }
+  
+  /**
+   * Do the encryption, input is {@link #inBuffer} and output is 
+   * {@link #outBuffer}.
+   */
+  private void encrypt() throws IOException {
+    Preconditions.checkState(inBuffer.position() >= padding);
+    if (inBuffer.position() == padding) {
+      // There is no real data in the inBuffer.
+      return;
+    }
+    inBuffer.flip();
+    outBuffer.clear();
+    encryptor.encrypt(inBuffer, outBuffer);
+    inBuffer.clear();
+    outBuffer.flip();
+    if (padding > 0) {
+      /*
+       * The plain text and cipher text have a 1:1 mapping, they start at the 
+       * same position.
+       */
+      outBuffer.position(padding);
+      padding = 0;
+    }
+    final int len = outBuffer.remaining();
+    
+    /*
+     * If underlying stream supports {@link ByteBuffer} write in future, needs
+     * refine here. 
+     */
+    final byte[] tmp = getTmpBuf();
+    outBuffer.get(tmp, 0, len);
+    out.write(tmp, 0, len);
+    
+    streamOffset += len;
+    if (encryptor.isContextReset()) {
+      /*
+       * This code is generally not executed since the encryptor usually
+       * maintains encryption context (e.g. the counter) internally. However,
+       * some implementations can't maintain context so a re-init is necessary
+       * after each encryption call.
+       */
+      updateEncryptor();
+    }
+  }
+  
+  /** Update the {@link #encryptor}: calculate counter and {@link #padding}. */
+  private void updateEncryptor() throws IOException {
+    final long counter =
+        streamOffset / codec.getCipherSuite().getAlgorithmBlockSize();
+    padding =
+        (byte)(streamOffset % codec.getCipherSuite().getAlgorithmBlockSize());
+    inBuffer.position(padding); // Set proper position for input data.
+    codec.calculateIV(initIV, counter, iv);
+    encryptor.init(key, iv);
+  }
+  
+  private byte[] tmpBuf;
+  private byte[] getTmpBuf() {
+    if (tmpBuf == null) {
+      tmpBuf = new byte[bufferSize];
+    }
+    return tmpBuf;
+  }
+  
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    
+    super.close();
+    freeBuffers();
+    closed = true;
+  }
+  
+  /**
+   * To flush, we need to encrypt the data in the buffer and write to the 
+   * underlying stream, then do the flush.
+   */
+  @Override
+  public void flush() throws IOException {
+    checkStream();
+    encrypt();
+    super.flush();
+  }
+  
+  @Override
+  public void write(int b) throws IOException {
+    oneByteBuf[0] = (byte)(b & 0xff);
+    write(oneByteBuf, 0, oneByteBuf.length);
+  }
+  
+  private void checkStream() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+  }
+  
+  @Override
+  public void setDropBehind(Boolean dropCache) throws IOException,
+      UnsupportedOperationException {
+    try {
+      ((CanSetDropBehind) out).setDropBehind(dropCache);
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("This stream does not " +
+          "support setting the drop-behind caching.");
+    }
+  }
+
+  @Override
+  @Deprecated
+  public void sync() throws IOException {
+    hflush();
+  }
+
+  @Override
+  public void hflush() throws IOException {
+    flush();
+    if (out instanceof Syncable) {
+      ((Syncable)out).hflush();
+    }
+  }
+
+  @Override
+  public void hsync() throws IOException {
+    flush();
+    if (out instanceof Syncable) {
+      ((Syncable)out).hsync();
+    }
+  }
+  
+  /** Forcibly free the direct buffers. */
+  private void freeBuffers() {
+    CryptoStreamUtils.freeDB(inBuffer);
+    CryptoStreamUtils.freeDB(outBuffer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoStreamUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoStreamUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoStreamUtils.java
new file mode 100644
index 0000000..820d775
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoStreamUtils.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Seekable;
+
+import com.google.common.base.Preconditions;
+
+@InterfaceAudience.Private
+public class CryptoStreamUtils {
+  private static final int MIN_BUFFER_SIZE = 512;
+  
+  /** Forcibly free the direct buffer. */
+  public static void freeDB(ByteBuffer buffer) {
+    if (buffer instanceof sun.nio.ch.DirectBuffer) {
+      final sun.misc.Cleaner bufferCleaner =
+          ((sun.nio.ch.DirectBuffer) buffer).cleaner();
+      bufferCleaner.clean();
+    }
+  }
+  
+  /** Read crypto buffer size */
+  public static int getBufferSize(Configuration conf) {
+    return conf.getInt(HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_KEY, 
+        HADOOP_SECURITY_CRYPTO_BUFFER_SIZE_DEFAULT);
+  }
+  
+  /** Check and floor buffer size */
+  public static int checkBufferSize(CryptoCodec codec, int bufferSize) {
+    Preconditions.checkArgument(bufferSize >= MIN_BUFFER_SIZE, 
+        "Minimum value of buffer size is " + MIN_BUFFER_SIZE + ".");
+    return bufferSize - bufferSize % codec.getCipherSuite()
+        .getAlgorithmBlockSize();
+  }
+  
+  /**
+   * If input stream is {@link org.apache.hadoop.fs.Seekable}, return it's
+   * current position, otherwise return 0;
+   */
+  public static long getInputStreamOffset(InputStream in) throws IOException {
+    if (in instanceof Seekable) {
+      return ((Seekable) in).getPos();
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java
new file mode 100644
index 0000000..9958415
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface Decryptor {
+  
+  /**
+   * Initialize the decryptor and the internal decryption context. 
+   * reset.
+   * @param key decryption key.
+   * @param iv decryption initialization vector
+   * @throws IOException if initialization fails
+   */
+  public void init(byte[] key, byte[] iv) throws IOException;
+  
+  /**
+   * Indicate whether the decryption context is reset.
+   * <p/>
+   * Certain modes, like CTR, require a different IV depending on the 
+   * position in the stream. Generally, the decryptor maintains any necessary
+   * context for calculating the IV and counter so that no reinit is necessary 
+   * during the decryption. Reinit before each operation is inefficient.
+   * @return boolean whether context is reset.
+   */
+  public boolean isContextReset();
+  
+  /**
+   * This presents a direct interface decrypting with direct ByteBuffers.
+   * <p/>
+   * This function does not always decrypt the entire buffer and may potentially
+   * need to be called multiple times to process an entire buffer. The object 
+   * may hold the decryption context internally.
+   * <p/>
+   * Some implementations may require sufficient space in the destination 
+   * buffer to decrypt the entire input buffer.
+   * <p/>
+   * Upon return, inBuffer.position() will be advanced by the number of bytes
+   * read and outBuffer.position() by bytes written. Implementations should 
+   * not modify inBuffer.limit() and outBuffer.limit().
+   * <p/>
+   * @param inBuffer a direct {@link ByteBuffer} to read from. inBuffer may 
+   * not be null and inBuffer.remaining() must be > 0
+   * @param outBuffer a direct {@link ByteBuffer} to write to. outBuffer may 
+   * not be null and outBuffer.remaining() must be > 0
+   * @throws IOException if decryption fails
+   */
+  public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) 
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java
new file mode 100644
index 0000000..6dc3cfb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface Encryptor {
+  
+  /**
+   * Initialize the encryptor and the internal encryption context.
+   * @param key encryption key.
+   * @param iv encryption initialization vector
+   * @throws IOException if initialization fails
+   */
+  public void init(byte[] key, byte[] iv) throws IOException;
+  
+  /**
+   * Indicate whether the encryption context is reset.
+   * <p/>
+   * Certain modes, like CTR, require a different IV depending on the
+   * position in the stream. Generally, the encryptor maintains any necessary
+   * context for calculating the IV and counter so that no reinit is necessary
+   * during the encryption. Reinit before each operation is inefficient. 
+   * @return boolean whether context is reset.
+   */
+  public boolean isContextReset();
+  
+  /**
+   * This presents a direct interface encrypting with direct ByteBuffers.
+   * <p/>
+   * This function does not always encrypt the entire buffer and may potentially
+   * need to be called multiple times to process an entire buffer. The object 
+   * may hold the encryption context internally.
+   * <p/>
+   * Some implementations may require sufficient space in the destination 
+   * buffer to encrypt the entire input buffer.
+   * <p/>
+   * Upon return, inBuffer.position() will be advanced by the number of bytes
+   * read and outBuffer.position() by bytes written. Implementations should
+   * not modify inBuffer.limit() and outBuffer.limit().
+   * <p/>
+   * @param inBuffer a direct {@link ByteBuffer} to read from. inBuffer may 
+   * not be null and inBuffer.remaining() must be > 0
+   * @param outBuffer a direct {@link ByteBuffer} to write to. outBuffer may 
+   * not be null and outBuffer.remaining() must be > 0
+   * @throws IOException if encryption fails
+   */
+  public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) 
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c77bd85b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JceAesCtrCryptoCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JceAesCtrCryptoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JceAesCtrCryptoCodec.java
new file mode 100644
index 0000000..61ee743
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/JceAesCtrCryptoCodec.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.security.SecureRandom;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_JCE_PROVIDER_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_JAVA_SECURE_RANDOM_ALGORITHM_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_JAVA_SECURE_RANDOM_ALGORITHM_DEFAULT;
+
+/**
+ * Implement the AES-CTR crypto codec using JCE provider.
+ */
+@InterfaceAudience.Private
+public class JceAesCtrCryptoCodec extends AesCtrCryptoCodec {
+  private static final Log LOG =
+      LogFactory.getLog(JceAesCtrCryptoCodec.class.getName());
+  
+  private Configuration conf;
+  private String provider;
+  private SecureRandom random;
+
+  public JceAesCtrCryptoCodec() {
+  }
+  
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    provider = conf.get(HADOOP_SECURITY_CRYPTO_JCE_PROVIDER_KEY);
+    final String secureRandomAlg = conf.get(
+        HADOOP_SECURITY_JAVA_SECURE_RANDOM_ALGORITHM_KEY, 
+        HADOOP_SECURITY_JAVA_SECURE_RANDOM_ALGORITHM_DEFAULT);
+    try {
+      random = (provider != null) ? 
+          SecureRandom.getInstance(secureRandomAlg, provider) : 
+            SecureRandom.getInstance(secureRandomAlg);
+    } catch (GeneralSecurityException e) {
+      LOG.warn(e.getMessage());
+      random = new SecureRandom();
+    }
+  }
+
+  @Override
+  public Encryptor createEncryptor() throws GeneralSecurityException {
+    return new JceAesCtrCipher(Cipher.ENCRYPT_MODE, provider);
+  }
+
+  @Override
+  public Decryptor createDecryptor() throws GeneralSecurityException {
+    return new JceAesCtrCipher(Cipher.DECRYPT_MODE, provider);
+  }
+  
+  @Override
+  public void generateSecureRandom(byte[] bytes) {
+    random.nextBytes(bytes);
+  }  
+  
+  private static class JceAesCtrCipher implements Encryptor, Decryptor {
+    private final Cipher cipher;
+    private final int mode;
+    private boolean contextReset = false;
+    
+    public JceAesCtrCipher(int mode, String provider) 
+        throws GeneralSecurityException {
+      this.mode = mode;
+      if (provider == null || provider.isEmpty()) {
+        cipher = Cipher.getInstance(SUITE.getName());
+      } else {
+        cipher = Cipher.getInstance(SUITE.getName(), provider);
+      }
+    }
+
+    @Override
+    public void init(byte[] key, byte[] iv) throws IOException {
+      Preconditions.checkNotNull(key);
+      Preconditions.checkNotNull(iv);
+      contextReset = false;
+      try {
+        cipher.init(mode, new SecretKeySpec(key, "AES"), 
+            new IvParameterSpec(iv));
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    /**
+     * AES-CTR will consume all of the input data. It requires enough space in 
+     * the destination buffer to encrypt entire input buffer.
+     */
+    @Override
+    public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
+        throws IOException {
+      process(inBuffer, outBuffer);
+    }
+    
+    /**
+     * AES-CTR will consume all of the input data. It requires enough space in
+     * the destination buffer to decrypt entire input buffer.
+     */
+    @Override
+    public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
+        throws IOException {
+      process(inBuffer, outBuffer);
+    }
+    
+    private void process(ByteBuffer inBuffer, ByteBuffer outBuffer)
+        throws IOException {
+      try {
+        int inputSize = inBuffer.remaining();
+        // Cipher#update will maintain crypto context.
+        int n = cipher.update(inBuffer, outBuffer);
+        if (n < inputSize) {
+          /**
+           * Typically code will not get here. Cipher#update will consume all 
+           * input data and put result in outBuffer. 
+           * Cipher#doFinal will reset the crypto context.
+           */
+          contextReset = true;
+          cipher.doFinal(inBuffer, outBuffer);
+        }
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+    
+    @Override
+    public boolean isContextReset() {
+      return contextReset;
+    }
+  }
+}


Mime
View raw message