hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject incubator-hawq git commit: HAWQ-1506. Fix multi-append bug of write a encryption zone
Date Fri, 28 Jul 2017 06:25:35 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 54a9af323 -> 2662bebd1


HAWQ-1506. Fix multi-append bug of write a encryption zone


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2662bebd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2662bebd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2662bebd

Branch: refs/heads/master
Commit: 2662bebd163069f5742ff0b236768cd559089f28
Parents: 54a9af3
Author: interma <interma@outlook.com>
Authored: Tue Jul 25 16:50:25 2017 +0800
Committer: rlei <rlei@pivotal.io>
Committed: Fri Jul 28 14:25:38 2017 +0800

----------------------------------------------------------------------
 depends/libhdfs3/mock/MockCryptoCodec.h         |   5 +-
 depends/libhdfs3/src/client/CryptoCodec.cpp     | 319 ++++++++++---------
 depends/libhdfs3/src/client/CryptoCodec.h       |  96 +++---
 .../libhdfs3/src/client/FileEncryptionInfo.h    |   2 +-
 depends/libhdfs3/src/client/HttpClient.cpp      |   2 +
 .../libhdfs3/src/client/OutputStreamImpl.cpp    |  32 +-
 .../libhdfs3/test/function/TestCInterface.cpp   | 160 +++++++++-
 .../libhdfs3/test/function/TestKmsClient.cpp    |   1 -
 .../libhdfs3/test/unit/UnitTestCryptoCodec.cpp  |  16 +-
 .../libhdfs3/test/unit/UnitTestOutputStream.cpp |   2 +-
 10 files changed, 416 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/mock/MockCryptoCodec.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/mock/MockCryptoCodec.h b/depends/libhdfs3/mock/MockCryptoCodec.h
index 4d23e11..a9a220e 100644
--- a/depends/libhdfs3/mock/MockCryptoCodec.h
+++ b/depends/libhdfs3/mock/MockCryptoCodec.h
@@ -30,8 +30,9 @@
 class MockCryptoCodec: public Hdfs::CryptoCodec {
 public:
   MockCryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider>
kcp, int32_t bufSize) : CryptoCodec(encryptionInfo, kcp, bufSize) {}
-  MOCK_METHOD2(encode, std::string(const char * buffer,int64_t size));
-  MOCK_METHOD2(decode, std::string(const char * buffer,int64_t size));
+
+  MOCK_METHOD2(init, int(CryptoMethod crypto_method, int64_t stream_offset));
+  MOCK_METHOD2(cipher_wrap, std::string(const char * buffer,int64_t size));
 };
 
 #endif /* _HDFS_LIBHDFS3_MOCK_CRYPTOCODEC_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/CryptoCodec.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/CryptoCodec.cpp b/depends/libhdfs3/src/client/CryptoCodec.cpp
index 6ba1b74..0ca2d16 100644
--- a/depends/libhdfs3/src/client/CryptoCodec.cpp
+++ b/depends/libhdfs3/src/client/CryptoCodec.cpp
@@ -25,154 +25,181 @@
 
 using namespace Hdfs::Internal;
 
-namespace Hdfs {
-
-/**
- * Construct a CryptoCodec instance.
- * @param encryptionInfo the encryption info of file.
- * @param kcp a KmsClientProvider instance to get key from kms server.
- * @param bufSize crypto buffer size.
- */
-CryptoCodec::CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider>
kcp, int32_t bufSize) : encryptionInfo(encryptionInfo), kcp(kcp), bufSize(bufSize)
-{
-
-    /* Init global status. */
-    ERR_load_crypto_strings();
-    OpenSSL_add_all_algorithms();
-    OPENSSL_config(NULL);
-
-    /* Create cipher context. */
-    encryptCtx = EVP_CIPHER_CTX_new();
-    cipher = NULL;
-
-}
-
-/**
- * Destroy a CryptoCodec instance.
- */
-CryptoCodec::~CryptoCodec()
-{
-    if (encryptCtx)
-        EVP_CIPHER_CTX_free(encryptCtx);
-}
-
-/**
- * Get decrypted key from kms.
- */
-std::string CryptoCodec::getDecryptedKeyFromKms()
-{
-    ptree map = kcp->decryptEncryptedKey(*encryptionInfo);
-    std::string key;
-    try {
-        key = map.get < std::string > ("material");
-    } catch (...) {
-        THROW(HdfsIOException, "CryptoCodec : Can not get key from kms.");
-    }
-
-    int rem = key.length() % 4;
-    if (rem) {
-        rem = 4 - rem;
-        while (rem != 0) {
-            key = key + "=";
-            rem--;
-        }
-    }
-
-    std::replace(key.begin(), key.end(), '-', '+');
-    std::replace(key.begin(), key.end(), '_', '/');
-
-    LOG(INFO, "CryptoCodec : getDecryptedKeyFromKms material is :%s", key.c_str());
-
-    key = KmsClientProvider::base64Decode(key);
-    return key;
-
-	
-}
-
-/**
- * Common encode/decode buffer method.
- * @param buffer the buffer to be encode/decode.
- * @param size the size of buffer.
- * @param enc true is for encode, false is for decode.
- * @return return the encode/decode buffer.
- */
-std::string CryptoCodec::endecInternal(const char * buffer, int64_t size, bool enc)
-{
-    std::string key = encryptionInfo->getKey();
-    std::string iv = encryptionInfo->getIv();
-    LOG(INFO,
-            "CryptoCodec : endecInternal info. key:%s, iv:%s, buffer:%s, size:%d, is_encode:%d.",
-            key.c_str(), iv.c_str(), buffer, size, enc);
-	
-    /* Get decrypted key from KMS */
-    key = getDecryptedKeyFromKms();
-
-    /* Select cipher method based on the key length. */
-    if (key.length() == KEY_LENGTH_256) {
-        cipher = EVP_aes_256_ctr();
-    } else if (key.length() == KEY_LENGTH_128) {
-        cipher = EVP_aes_128_ctr();
-    } else {
-        THROW(InvalidParameter, "CryptoCodec : Invalid key length.");
-    }
-
-    /* Init cipher context with cipher method, encrypted key and IV from KMS. */
-    int encode = enc ? 1 : 0;
-    if (!EVP_CipherInit_ex(encryptCtx, cipher, NULL,
-            (const unsigned char *) key.c_str(),
-            (const unsigned char *) iv.c_str(), encode)) {
-        LOG(WARNING, "EVP_CipherInit_ex failed");
-    }
-    LOG(DEBUG3, "EVP_CipherInit_ex successfully");
-    EVP_CIPHER_CTX_set_padding(encryptCtx, 0);
-
-    /* Encode/decode buffer within cipher context. */
-    std::string result;
-    result.resize(size);
-    int offset = 0;
-    int remaining = size;
-    int len = 0;
-    /* If the encode/decode buffer size larger than crypto buffer size, encode/decode buffer
one by one. */
-    while (remaining > bufSize) {
-        if (!EVP_CipherUpdate(encryptCtx, (unsigned char *) &result[offset],
-                &len, (const unsigned char *) buffer + offset, bufSize)) {
-            std::string err = ERR_lib_error_string(ERR_get_error());
-            THROW(HdfsIOException, "CryptoCodec : Cannot encrypt AES data %s",
-                    err.c_str());
-        }
-        offset += len;
-        remaining -= len;
-        LOG(DEBUG3,
-                "CryptoCodec : EVP_CipherUpdate successfully, result:%s, len:%d",
-                result.c_str(), len);
-    }
-    if (remaining) {
-        if (!EVP_CipherUpdate(encryptCtx, (unsigned char *) &result[offset],
-                &len, (const unsigned char *) buffer + offset, remaining)) {
-            std::string err = ERR_lib_error_string(ERR_get_error());
-            THROW(HdfsIOException, "CryptoCodec : Cannot encrypt AES data %s",
-                    err.c_str());
-        }
-    }
-
-    return result;
-}
 
-/**
- * Encode buffer.
- */
-std::string CryptoCodec::encode(const char * buffer, int64_t size)
-{
-    return endecInternal(buffer, size, true);
-}
+namespace Hdfs {
 
-/**
- * Decode buffer.
- */	
-std::string CryptoCodec::decode(const char * buffer, int64_t size)
-{
-    return endecInternal(buffer, size, false);
-}
+	//copy from java HDFS code
+	std::string CryptoCodec::calculateIV(const std::string& initIV, unsigned long counter)
{
+		char IV[initIV.length()];
+
+		int i = initIV.length(); // IV length
+		int j = 0; // counter bytes index
+		unsigned int sum = 0;
+		while (i-- > 0) {
+			// (sum >>> Byte.SIZE) is the carry for addition
+			sum = (initIV[i] & 0xff) + (sum >> 8);
+			if (j++ < 8) { // Big-endian, and long is 8 bytes length
+				sum += (char) counter & 0xff;
+				counter >>= 8;
+			}
+			IV[i] = (char) sum;
+		}
+
+		return std::string(IV, initIV.length());
+	}
+
+	CryptoCodec::CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider>
kcp, int32_t bufSize) :
+		encryptionInfo(encryptionInfo), kcp(kcp), bufSize(bufSize)
+	{
+
+		// Init global status
+		ERR_load_crypto_strings();
+		OpenSSL_add_all_algorithms();
+		OPENSSL_config(NULL);
+
+		// Create cipher context
+		cipherCtx = EVP_CIPHER_CTX_new();
+		cipher = NULL;
+
+		padding = 0;
+		counter = 0;
+		is_init = false;
+	}
+
+	CryptoCodec::~CryptoCodec()
+	{
+		if (cipherCtx)
+			EVP_CIPHER_CTX_free(cipherCtx);
+	}
+
+	std::string CryptoCodec::getDecryptedKeyFromKms()
+	{
+		ptree map = kcp->decryptEncryptedKey(*encryptionInfo);
+		std::string key;
+		try {
+			key = map.get < std::string > ("material");
+		} catch (...) {
+			THROW(HdfsIOException, "CryptoCodec : Can not get key from kms.");
+		}
+
+		int rem = key.length() % 4;
+		if (rem) {
+			rem = 4 - rem;
+			while (rem != 0) {
+				key = key + "=";
+				rem--;
+			}
+		}
+
+		std::replace(key.begin(), key.end(), '-', '+');
+		std::replace(key.begin(), key.end(), '_', '/');
+
+		LOG(INFO, "CryptoCodec : getDecryptedKeyFromKms material is :%s", key.c_str());
+
+		key = KmsClientProvider::base64Decode(key);
+		return key;
+	}
+
+	int CryptoCodec::init(CryptoMethod crypto_method, int64_t stream_offset) {
+		//check already init
+		if (is_init)
+			return 0;
+
+		// Get decrypted key from KMS
+		std::string key = getDecryptedKeyFromKms();
+
+		// Select cipher method based on the key length
+		uint64_t AlgorithmBlockSize = key.length();
+		if (AlgorithmBlockSize == KEY_LENGTH_256) {
+			cipher = EVP_aes_256_ctr();
+		} else if (AlgorithmBlockSize == KEY_LENGTH_128) {
+			cipher = EVP_aes_128_ctr();
+		} else {
+			LOG(WARNING, "CryptoCodec : Invalid key length.");
+			return -1;
+		}
+
+		//calculate new IV when appending a existed file
+		std::string iv = encryptionInfo->getIv();
+		if (stream_offset > 0) {
+			counter = stream_offset / AlgorithmBlockSize;
+			padding = stream_offset % AlgorithmBlockSize;
+			iv = this->calculateIV(iv, counter);
+		}
+
+		//judge encrypt/decrypt
+		int enc = 0;
+		method = crypto_method;
+		if (method == CryptoMethod::ENCRYPT)
+			enc = 1;
+
+		// Init cipher context with cipher method
+		if (!EVP_CipherInit_ex(cipherCtx, cipher, NULL,
+				(const unsigned char *) key.c_str(), (const unsigned char *) iv.c_str(),
+				enc)) {
+			LOG(WARNING, "EVP_CipherInit_ex failed");
+			return -1;
+		}
+
+		//AES/CTR/NoPadding 
+		EVP_CIPHER_CTX_set_padding(cipherCtx, 0);
+
+		LOG(INFO, "CryptoCodec init success, key_length:%llu, is_encode:%d", AlgorithmBlockSize,
enc);
+		is_init = true;
+		return 1;
+	}
+
+	std::string CryptoCodec::cipher_wrap(const char * buffer, int64_t size) {
+		if (!is_init)
+			THROW(InvalidParameter, "CryptoCodec isn't init");
+
+		int offset = 0;
+		int remaining = size;
+		int len = 0;
+		int ret = 0;
+
+		std::string in_buf(buffer,size);
+		std::string out_buf(size, 0);
+		//set necessary padding when appending a existed file
+		if (padding > 0) {
+			in_buf.insert(0, padding, 0);
+			out_buf.resize(padding+size);
+			remaining += padding;
+		}
+
+		// If the encode/decode buffer size larger than crypto buffer size, encode/decode buffer
one by one
+		while (remaining > bufSize) {
+			ret = EVP_CipherUpdate(cipherCtx, (unsigned char *) &out_buf[offset], &len, 
+				(const unsigned char *)in_buf.data() + offset, bufSize);
+
+			if (!ret) {
+				std::string err = ERR_lib_error_string(ERR_get_error());
+				THROW(HdfsIOException, "CryptoCodec : cipher_wrap AES data failed:%s, crypto_method:%d",
err.c_str(), method);
+			}
+			offset += len;
+			remaining -= len;
+			LOG(DEBUG3, "CryptoCodec : EVP_CipherUpdate successfully, len:%d", len);
+		}
+
+		if (remaining) {
+			ret = EVP_CipherUpdate(cipherCtx, (unsigned char *) &out_buf[offset], &len,
+				(const unsigned char *) in_buf.data() + offset, remaining);
+
+			if (!ret) {
+				std::string err = ERR_lib_error_string(ERR_get_error());
+				THROW(HdfsIOException, "CryptoCodec : cipher_wrap AES data failed:%s, crypto_method:%d",
err.c_str(), method);
+			}
+
+		}
+
+		//cut off padding when necessary
+		if (padding > 0) {
+			out_buf.erase(0, padding);
+			padding = 0;
+		}
+
+		return out_buf;
+	}
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/CryptoCodec.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/CryptoCodec.h b/depends/libhdfs3/src/client/CryptoCodec.h
index e45599b..cae7d3b 100644
--- a/depends/libhdfs3/src/client/CryptoCodec.h
+++ b/depends/libhdfs3/src/client/CryptoCodec.h
@@ -35,54 +35,68 @@
 
 namespace Hdfs {
 
-class CryptoCodec {
-public:
-    /**
-     * Construct a CryptoCodec instance.
-     * @param encryptionInfo the encryption info of file.
-     * @param kcp a KmsClientProvider instance to get key from kms server.
-     * @param bufSize crypto buffer size.
-     */
-    CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp,
int32_t bufSize);
+	enum CryptoMethod {
+		DECRYPT = 0,
+		ENCRYPT = 1
+	};
 
-    /**
-     * Destroy a CryptoCodec instance.
-     */
-    virtual ~CryptoCodec();
+	class CryptoCodec {
+	public:
+		/**
+		 * Construct a CryptoCodec instance.
+		 * @param encryptionInfo the encryption info of file.
+		 * @param kcp a KmsClientProvider instance to get key from kms server.
+		 * @param bufSize crypto buffer size.
+		 */
+		CryptoCodec(FileEncryptionInfo *encryptionInfo, shared_ptr<KmsClientProvider> kcp,
int32_t bufSize);
 
-    /**
-     * Encode buffer.
-     */
-    virtual std::string encode(const char * buffer, int64_t size);
+		/**
+		 * Destroy a CryptoCodec instance.
+		 */
+		virtual ~CryptoCodec();
 
-    /**
-     * Decode buffer.
-     */
-    virtual std::string decode(const char * buffer, int64_t size);
+		/**
+		 * encrypt/decrypt(depends on init()) buffer data
+		 * @param buffer
+		 * @param size
+		 * @return encrypt/decrypt result string
+		 */
+		virtual std::string cipher_wrap(const char * buffer, int64_t size);
 
-private:
+		/**
+		 * init CryptoCodec
+		 * @param method CryptoMethod
+		 * @param stream_offset 0 when open a new file; file_lenght when append a existed file
+		 * @return 1 success; 0 no need(already inited); -1 failed
+		 */
+		virtual int init(CryptoMethod crypto_method, int64_t stream_offset = 0);
 
-    /**
-     * Common encode/decode buffer method.
-     * @param buffer the buffer to be encode/decode.
-     * @param size the size of buffer.
-     * @param enc true is for encode, false is for decode.
-     * @return return the encode/decode buffer.
-     */
-    std::string endecInternal(const char *buffer, int64_t size, bool enc);
+	private:
 
-    /**
-     * Get decrypted key from kms.
-     */
-    std::string getDecryptedKeyFromKms();
+		/**
+		 * Get decrypted key from kms.
+		 */
+		std::string getDecryptedKeyFromKms();
 
-    shared_ptr<KmsClientProvider> kcp;
-    FileEncryptionInfo *encryptionInfo;
-    EVP_CIPHER_CTX *encryptCtx;
-    EVP_CIPHER_CTX *decryptCtx;
-    const EVP_CIPHER *cipher;
-    int32_t bufSize;
-};
+		/**
+		 * calculate new IV for appending a existed file
+		 * @param initIV
+		 * @param counter
+		 * @return new IV string
+		 */
+		std::string calculateIV(const std::string& initIV, unsigned long counter);
+
+		shared_ptr<KmsClientProvider> kcp;
+		FileEncryptionInfo* encryptionInfo;
+		EVP_CIPHER_CTX*     cipherCtx;
+		const EVP_CIPHER*   cipher;
+		CryptoMethod	method;
+
+		bool	is_init;
+		int32_t		bufSize;
+		int64_t		padding;
+		int64_t		counter;
+	};
 
 }
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/FileEncryptionInfo.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/FileEncryptionInfo.h b/depends/libhdfs3/src/client/FileEncryptionInfo.h
index 32ead6c..7584c02 100644
--- a/depends/libhdfs3/src/client/FileEncryptionInfo.h
+++ b/depends/libhdfs3/src/client/FileEncryptionInfo.h
@@ -81,8 +81,8 @@ public:
     }
 
 private:
-    int suite;
     int cryptoProtocolVersion;
+    int suite;
     std::string key;
     std::string iv;
     std::string keyName;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/HttpClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/HttpClient.cpp b/depends/libhdfs3/src/client/HttpClient.cpp
index 562f599..09a74a6 100644
--- a/depends/libhdfs3/src/client/HttpClient.cpp
+++ b/depends/libhdfs3/src/client/HttpClient.cpp
@@ -339,6 +339,8 @@ std::string HttpClient::escape(const std::string &data) {
     } else {
         LOG(WARNING, "HttpClient : Curl in escape method is NULL");
     }
+    std::string empty;
+    return empty;
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/src/client/OutputStreamImpl.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/OutputStreamImpl.cpp b/depends/libhdfs3/src/client/OutputStreamImpl.cpp
index 4c5f869..d987295 100644
--- a/depends/libhdfs3/src/client/OutputStreamImpl.cpp
+++ b/depends/libhdfs3/src/client/OutputStreamImpl.cpp
@@ -255,9 +255,18 @@ void OutputStreamImpl::openInternal(shared_ptr<FileSystemInter>
fs, const char *
             FileEncryptionInfo *fileEnInfo = fileStatus.getFileEncryption();
             if (fileStatus.isFileEncrypted()) {
                 if (cryptoCodec == NULL) {
-                    auth = shared_ptr<RpcAuth> (new RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getKmsMethod())));
-                    kcp = shared_ptr<KmsClientProvider> (new KmsClientProvider(auth,
conf));
-                    cryptoCodec = shared_ptr<CryptoCodec> (new CryptoCodec(fileEnInfo,
kcp, conf->getCryptoBufferSize()));
+                    auth = shared_ptr<RpcAuth> (
+                            new RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getKmsMethod())));
+                    kcp = shared_ptr<KmsClientProvider> (
+                            new KmsClientProvider(auth, conf));
+                    cryptoCodec = shared_ptr<CryptoCodec> (
+                            new CryptoCodec(fileEnInfo, kcp, conf->getCryptoBufferSize()));
+
+                    int64_t file_length = fileStatus.getLength();
+                    int ret = cryptoCodec->init(CryptoMethod::ENCRYPT, file_length);
+                    if (ret < 0) {
+                        THROW(HdfsIOException, "init CryptoCodec failed, file:%s", this->path.c_str());
+                    }
                 }
             }
             initAppend();
@@ -278,13 +287,18 @@ void OutputStreamImpl::openInternal(shared_ptr<FileSystemInter>
fs, const char *
     if (fileStatus.isFileEncrypted()) {
         if (cryptoCodec == NULL) {
             auth = shared_ptr<RpcAuth>(
-                    new RpcAuth(fs->getUserInfo(),
-                            RpcAuth::ParseMethod(conf->getKmsMethod())));
+                    new RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getKmsMethod())));
             kcp = shared_ptr<KmsClientProvider>(
                     new KmsClientProvider(auth, conf));
             cryptoCodec = shared_ptr<CryptoCodec>(
-                    new CryptoCodec(fileEnInfo, kcp,
-                            conf->getCryptoBufferSize()));
+                    new CryptoCodec(fileEnInfo, kcp, conf->getCryptoBufferSize()));
+
+            int64_t file_length = fileStatus.getLength();
+            assert(file_length == 0);
+            int ret = cryptoCodec->init(CryptoMethod::ENCRYPT, file_length);
+            if (ret < 0) {
+                THROW(HdfsIOException, "init CryptoCodec failed, file:%s", this->path.c_str());
+            }
         }
     }
     closed = false;
@@ -317,8 +331,10 @@ void OutputStreamImpl::append(const char * buf, int64_t size) {
 void OutputStreamImpl::appendInternal(const char * buf, int64_t size) {
     int64_t todo = size;
 	std::string bufEncode;
+
     if (fileStatus.isFileEncrypted()) {
-		bufEncode = cryptoCodec->encode(buf, size);
+        //encrypt buf
+        bufEncode = cryptoCodec->cipher_wrap(buf, size);
         buf = bufEncode.c_str();
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/function/TestCInterface.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/test/function/TestCInterface.cpp b/depends/libhdfs3/test/function/TestCInterface.cpp
index 56fe07e..40f6a1b 100644
--- a/depends/libhdfs3/test/function/TestCInterface.cpp
+++ b/depends/libhdfs3/test/function/TestCInterface.cpp
@@ -134,6 +134,20 @@ static void bufferMD5(const char* strFilePath, int size, char* result)
{
     }
 }
 
+static void diff_file2buffer(const char *file_path, const char *buf) {
+    std::cout << "diff file: " << file_path << std::endl;
+    char resultFile[33] = { 0 };
+    char resultBuffer[33] = { 0 };
+
+    fileMD5(file_path, resultFile);
+    std::cout << "resultFile is " << resultFile << std::endl;
+
+    bufferMD5(buf, strlen(buf), resultBuffer);
+    std::cout << "resultBuf is " << resultBuffer << std::endl;
+
+    ASSERT_STREQ(resultFile, resultBuffer);
+}
+
 bool CheckFileContent(hdfsFS fs, const char * path, int64_t len, size_t offset) {
     hdfsFile in = hdfsOpenFile(fs, path, O_RDONLY, 0, 0, 0);
 
@@ -246,7 +260,6 @@ TEST(TestCInterfaceConnect, TestConnect_Success) {
 TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) {
     hdfsFS fs = NULL;
     hdfsEncryptionZoneInfo * enInfo = NULL;
-    char * uri = NULL;
     setenv("LIBHDFS3_CONF", "function-test.xml", 1);
     struct hdfsBuilder * bld = hdfsNewBuilder();
     assert(bld != NULL);
@@ -256,7 +269,7 @@ TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) {
     system("hadoop fs -rmr /TDE");
     system("hadoop key create keytde");
     system("hadoop fs -mkdir /TDE");
-    ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, "/TDE", "keytde")); 
+    ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, "/TDE", "keytde"));
     enInfo = hdfsGetEZForPath(fs, "/TDE");
     ASSERT_TRUE(enInfo != NULL);
     EXPECT_TRUE(enInfo->mKeyName != NULL);
@@ -274,11 +287,10 @@ TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) {
         system(tdeKey.c_str());
         system(mkTde.c_str());
         ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, tde.c_str(), key.c_str()));
-    } 
-    hdfsEncryptionZoneInfo * enZoneInfos = NULL;
+    }
     int num = 0;
     hdfsListEncryptionZones(fs, &num);
-    EXPECT_EQ(num, 12); 
+    EXPECT_EQ(num, 12);
     ASSERT_EQ(hdfsDisconnect(fs), 0);
     hdfsFreeBuilder(bld);
 }
@@ -286,7 +298,6 @@ TEST(TestCInterfaceTDE, DISABLED_TestCreateEnRPC_Success) {
 TEST(TestCInterfaceTDE, TestAppendWithTDE_Success) {
     hdfsFS fs = NULL;
     hdfsEncryptionZoneInfo * enInfo = NULL;
-    char * uri = NULL;
     setenv("LIBHDFS3_CONF", "function-test.xml", 1);
     struct hdfsBuilder * bld = hdfsNewBuilder();
     assert(bld != NULL);
@@ -327,7 +338,6 @@ TEST(TestCInterfaceTDE, TestAppendWithTDE_Success) {
 TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) {
     hdfsFS fs = NULL;
     hdfsEncryptionZoneInfo * enInfo = NULL;
-    char * uri = NULL;
     setenv("LIBHDFS3_CONF", "function-test.xml", 1);
     struct hdfsBuilder * bld = hdfsNewBuilder();
     assert(bld != NULL);
@@ -348,6 +358,7 @@ TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) {
     const char *tdefile = "/TDE/testfile";
     ASSERT_TRUE(CreateFile(fs, tdefile, 0, 0));
 
+    //case1: append
     int size = 1024 * 32;
     size_t offset = 0;
     hdfsFile out;
@@ -371,15 +382,10 @@ TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) {
     } while (0);
     system("rm -rf ./testfile");
     system("hadoop fs -get /TDE/testfile ./");
-    char resultFile[33] = { 0 };
-    fileMD5("./testfile", resultFile);
-    std::cout << "resultFile is " << resultFile << std::endl;
-    char resultBuffer[33] = { 0 };
-    LOG(INFO, "buffer is %s", &buffer[0]);
-    bufferMD5(&buffer[0], size, resultBuffer);
-    std::cout << "result is " << resultBuffer << std::endl;
-    ASSERT_STREQ(resultFile, resultBuffer);
+    diff_file2buffer("testfile", &buffer[0]);
     system("rm ./testfile");
+
+    //case5: a large file (> 64M) TODO
     system("hadoop fs -rmr /TDE");
     system("hadoop key delete keytde4append -f");
     ASSERT_EQ(hdfsDisconnect(fs), 0);
@@ -387,6 +393,106 @@ TEST(TestCInterfaceTDE, TestAppendWithTDELargeFiles_Success) {
 }
 
 
+TEST(TestCInterfaceTDE, TestAppendMultiTimes_Success) {
+    hdfsFS fs = NULL;
+    hdfsEncryptionZoneInfo * enInfo = NULL;
+    setenv("LIBHDFS3_CONF", "function-test.xml", 1);
+    struct hdfsBuilder * bld = hdfsNewBuilder();
+    assert(bld != NULL);
+    hdfsBuilderSetNameNode(bld, "default");
+    fs = hdfsBuilderConnect(bld);
+    ASSERT_TRUE(fs != NULL);
+
+    //creake iey and encryption zone
+    system("hadoop fs -rmr /TDE");
+    system("hadoop key delete keytde4append -f");
+    system("hadoop key create keytde4append");
+    system("hadoop fs -mkdir /TDE");
+    ASSERT_EQ(0, hdfsCreateEncryptionZone(fs, "/TDE", "keytde4append"));
+    enInfo = hdfsGetEZForPath(fs, "/TDE");
+    ASSERT_TRUE(enInfo != NULL);
+    EXPECT_TRUE(enInfo->mKeyName != NULL);
+    hdfsFreeEncryptionZoneInfo(enInfo, 1);
+
+    hdfsFile out;
+    //case2: close and append
+    const char *tdefile2 = "/TDE/testfile2";
+    char out_data2[] = "12345678";
+    ASSERT_TRUE(CreateFile(fs, tdefile2, 0, 0));
+    out = hdfsOpenFile(fs, tdefile2, O_WRONLY | O_APPEND, 0, 0, 0);
+    hdfsWrite(fs, out, out_data2, 4);
+    hdfsCloseFile(fs, out);
+
+    out = hdfsOpenFile(fs, tdefile2, O_WRONLY | O_APPEND, 0, 0, 0);
+    hdfsWrite(fs, out, out_data2+4, 4);
+    hdfsCloseFile(fs, out);
+    system("rm ./testfile2");
+    system("hadoop fs -get /TDE/testfile2 ./");
+    diff_file2buffer("testfile2", out_data2);
+
+    //case3: multi-append
+    const char *tdefile3 = "/TDE/testfile3";
+    char out_data3[] = "1234567812345678123456781234567812345678123456781234567812345678";
//16*4byte
+    ASSERT_TRUE(CreateFile(fs, tdefile3, 0, 0));
+    out = hdfsOpenFile(fs, tdefile3, O_WRONLY | O_APPEND, 0, 0, 0);
+    hdfsWrite(fs, out, out_data3, 5);
+    hdfsWrite(fs, out, out_data3+5, 28);
+    hdfsWrite(fs, out, out_data3+33, 15);
+    hdfsWrite(fs, out, out_data3+48, 16);
+    hdfsCloseFile(fs, out);
+    system("rm ./testfile3");
+    system("hadoop fs -get /TDE/testfile3 ./");
+    diff_file2buffer("testfile3", out_data3);
+
+
+    //case4: multi-append > bufsize(8k)
+    const char *tdefile4 = "/TDE/testfile4";
+    int data_size = 13*1024+1;
+    char *out_data4 = (char *)malloc(data_size);
+    Hdfs::FillBuffer(out_data4, data_size-1, 1024);
+    out_data4[data_size-1] = 0;
+    ASSERT_TRUE(CreateFile(fs, tdefile4, 0, 0));
+    out = hdfsOpenFile(fs, tdefile4, O_WRONLY | O_APPEND, 0, 0, 0);
+
+    int todo = 0;
+    int offset = 0;
+    todo = 9*1024-1;
+    while (todo > 0) {
+        int rc = 0;
+        if (0 > (rc = hdfsWrite(fs, out, out_data4+offset, todo))) {
+            break;
+        }
+        todo -= rc;
+        offset += rc;
+    }
+
+    todo = 4*1024+1;
+    while (todo > 0) {
+        int rc = 0;
+        if (0 > (rc = hdfsWrite(fs, out, out_data4+offset, todo))) {
+            break;
+        }
+        todo -= rc;
+        offset += rc;
+    }
+
+
+    ASSERT_EQ(data_size-1, offset);
+
+    hdfsCloseFile(fs, out);
+    system("rm ./testfile4");
+    system("hadoop fs -get /TDE/testfile4 ./");
+    diff_file2buffer("testfile4", out_data4);
+    free(out_data4);
+
+
+
+    system("hadoop fs -rmr /TDE");
+    system("hadoop key delete keytde4append -f");
+    ASSERT_EQ(hdfsDisconnect(fs), 0);
+    hdfsFreeBuilder(bld);
+}
+
 TEST(TestErrorMessage, TestErrorMessage) {
     EXPECT_NO_THROW(hdfsGetLastError());
     hdfsChown(NULL, TEST_HDFS_PREFIX, NULL, NULL);
@@ -1773,3 +1879,27 @@ TEST_F(TestCInterface, TestGetHosts_Success) {
     hdfsFreeHosts(hosts);
     hdfsCloseFile(fs, out);
 }
+
+// test concurrent write to a same file
+// expected:
+//  At any point there can only be 1 writer.
+//  This is enforced by requiring the writer to acquire leases.
+TEST_F(TestCInterface, TestConcurrentWrite_Failure) {
+    hdfsFS fs = NULL;
+    setenv("LIBHDFS3_CONF", "function-test.xml", 1);
+    struct hdfsBuilder * bld = hdfsNewBuilder();
+    assert(bld != NULL);
+    hdfsBuilderSetNameNode(bld, "default");
+    fs = hdfsBuilderConnect(bld);
+    ASSERT_TRUE(fs != NULL);
+
+    const char *file_path = BASE_DIR "/concurrent_write";
+    char buf[] = "1234";
+    hdfsFile fout1 = hdfsOpenFile(fs, file_path, O_WRONLY | O_APPEND, 0, 0, 0);
+    hdfsFile fout2 = hdfsOpenFile(fs, file_path, O_WRONLY | O_APPEND, 0, 0, 0);
+    ASSERT_TRUE(fout2 == NULL); //must failed
+    int rc = hdfsWrite(fs, fout1, buf, sizeof(buf)-1);
+    ASSERT_TRUE(rc > 0);
+    int retval = hdfsCloseFile(fs, fout1);
+    ASSERT_TRUE(retval == 0);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/function/TestKmsClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/test/function/TestKmsClient.cpp b/depends/libhdfs3/test/function/TestKmsClient.cpp
index d997f88..21280de 100644
--- a/depends/libhdfs3/test/function/TestKmsClient.cpp
+++ b/depends/libhdfs3/test/function/TestKmsClient.cpp
@@ -186,7 +186,6 @@ TEST_F(TestKmsClient, DecryptEncryptedKeySuccess) {
 TEST_F(TestKmsClient, CreateKeyFailediBadUrl) {
     std::string keyName = "testcreatekeyfailname";
     std::string cipher = "AES/CTR/NoPadding";
-    int length = 128;
     std::string material = "testCreateKey";
 
     std::string url[4] = { "ftp:///http@localhost:16000/kms",

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp b/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp
index 36c67b1..92e9403 100644
--- a/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp
+++ b/depends/libhdfs3/test/unit/UnitTestCryptoCodec.cpp
@@ -111,7 +111,8 @@ TEST_F(TestCryptoCodec, encode_Success) {
 
     //char buf[1024] = "encode hello world";
     char buf[1024];
-    Hdfs::FillBuffer(buf, sizeof(buf), 2048);
+    Hdfs::FillBuffer(buf, sizeof(buf)-1, 2048);
+    buf[sizeof(buf)-1] = 0;
 
     int32_t bufSize = 1024;
 
@@ -121,13 +122,20 @@ TEST_F(TestCryptoCodec, encode_Success) {
         encryptionInfo.setKey(Key[i]);
         shared_ptr<MockHttpClient> hc(new MockHttpClient());
         kcp->setHttpClient(hc);
-        CryptoCodec es(&encryptionInfo, kcp, bufSize);
+
         EXPECT_CALL(*kcp, decryptEncryptedKey(_)).Times(2).WillRepeatedly(
                 Return(kcp->getEDKResult(encryptionInfo)));
-        std::string encodeStr = es.encode(buf, strlen(buf));
+
+        CryptoCodec es(&encryptionInfo, kcp, bufSize);
+        es.init(CryptoMethod::ENCRYPT);
+        CryptoCodec ds(&encryptionInfo, kcp, bufSize);
+        ds.init(CryptoMethod::DECRYPT);
+
+
+        std::string encodeStr = es.cipher_wrap(buf, strlen(buf));
         ASSERT_NE(0, memcmp(buf, encodeStr.c_str(), strlen(buf)));
 
-        std::string decodeStr = es.decode(encodeStr.c_str(), strlen(buf));
+        std::string decodeStr = ds.cipher_wrap(encodeStr.c_str(), strlen(buf));
         ASSERT_STREQ(decodeStr.c_str(), buf);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2662bebd/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp b/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp
index b8b6a46..de36eac 100644
--- a/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp
+++ b/depends/libhdfs3/test/unit/UnitTestOutputStream.cpp
@@ -406,7 +406,7 @@ TEST_F(TestOutputStream, appendEncryption_Success) {
     EXPECT_CALL(*pipelineStub, close(_)).Times(2).WillOnce(Return(lastBlock)).WillOnce(Return(lastBlock));
     EXPECT_CALL(*fs, fsync(_)).Times(2);
     std::string bufferEn;
-    EXPECT_CALL(*cryptoC, encode(_,_)).Times(1).WillOnce(Return(bufferEn));
+    EXPECT_CALL(*cryptoC, cipher_wrap(_,_)).Times(1).WillOnce(Return(bufferEn));
     EXPECT_NO_THROW(ous.append(buffer, sizeof(buffer)));
     EXPECT_CALL(*pipelineStub, close(_)).Times(1).WillOnce(Return(lastBlock));
     EXPECT_CALL(*fs, fsync(_)).Times(1);


Mime
View raw message