hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1611413 [15/18] - in /hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nati...
Date Thu, 17 Jul 2014 17:45:01 GMT
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include "commons.h"
+#include "BufferStream.h"
+#include "FileSystem.h"
+#include "IFile.h"
+#include "test_commons.h"
+
+SingleSpillInfo * writeIFile(int partition, vector<pair<string, string> > & kvs,
+    const string & path, KeyValueType type, const string & codec) {
+  FileOutputStream * fout = (FileOutputStream*)FileSystem::getLocal().create(path);
+  IFileWriter * iw = new IFileWriter(fout, CHECKSUM_CRC32, type, type, codec, NULL);
+  for (int i = 0; i < partition; i++) {
+    iw->startPartition();
+    for (size_t i = 0; i < kvs.size(); i++) {
+      pair<string, string> & p = kvs[i];
+      iw->write(p.first.c_str(), p.first.length(), p.second.c_str(), p.second.length());
+    }
+    iw->endPartition();
+  }
+  SingleSpillInfo * info = iw->getSpillInfo();
+  delete iw;
+  delete fout;
+  return info;
+}
+
+void readIFile(vector<pair<string, string> > & kvs, const string & path, KeyValueType type,
+    SingleSpillInfo * info, const string & codec) {
+  FileInputStream * fin = (FileInputStream*)FileSystem::getLocal().open(path);
+  IFileReader * ir = new IFileReader(fin, info);
+  while (ir->nextPartition()) {
+    const char * key, *value;
+    uint32_t keyLen, valueLen;
+    while (NULL != (key = ir->nextKey(keyLen))) {
+      value = ir->value(valueLen);
+      string keyS(key, keyLen);
+      string valueS(value, valueLen);
+      kvs.push_back(std::make_pair(keyS, valueS));
+    }
+  }
+  delete ir;
+  delete fin;
+}
+
+void TestIFileReadWrite(KeyValueType kvtype, int partition, int size,
+    vector<pair<string, string> > & kvs, const string & codec = "") {
+  string outputpath = "ifilewriter";
+  SingleSpillInfo * info = writeIFile(partition, kvs, outputpath, kvtype, codec);
+  LOG("write finished");
+  vector<pair<string, string> > readkvs;
+  readIFile(readkvs, outputpath, kvtype, info, codec);
+  LOG("read finished");
+  ASSERT_EQ(kvs.size() * partition, readkvs.size());
+  for (int i = 0; i < partition; i++) {
+    vector<pair<string, string> > cur_part(readkvs.begin() + i * kvs.size(),
+        readkvs.begin() + (i + 1) * kvs.size());
+    ASSERT_EQ(kvs.size(), cur_part.size());
+//    for (size_t j=0;j<kvs.size();j++) {
+//      SCOPED_TRACE(j);
+//      ASSERT_EQ(kvs[j], cur_part[j]);
+//    }
+    ASSERT_EQ(kvs, cur_part);
+  }
+  FileSystem::getLocal().remove(outputpath);
+}
+
+TEST(IFile, WriteRead) {
+  int partition = TestConfig.getInt("ifile.partition", 7);
+  int size = TestConfig.getInt("partition.size", 20000);
+  vector<pair<string, string> > kvs;
+  Generate(kvs, size, "bytes");
+  TestIFileReadWrite(TextType, partition, size, kvs);
+  TestIFileReadWrite(BytesType, partition, size, kvs);
+  TestIFileReadWrite(UnknownType, partition, size, kvs);
+  TestIFileReadWrite(TextType, partition, size, kvs, "org.apache.hadoop.io.compress.SnappyCodec");
+}
+
+void TestIFileWriteRead2(vector<pair<string, string> > & kvs, char * buff, size_t buffsize,
+    const string & codec, ChecksumType checksumType, KeyValueType type) {
+  int partition = TestConfig.getInt("ifile.partition", 50);
+  Timer timer;
+  OutputBuffer outputBuffer = OutputBuffer(buff, buffsize);
+  IFileWriter * iw = new IFileWriter(&outputBuffer, checksumType, type, type, codec, NULL);
+  timer.reset();
+  for (int i = 0; i < partition; i++) {
+    iw->startPartition();
+    for (size_t j = 0; j < kvs.size(); j++) {
+      iw->write(kvs[j].first.c_str(), kvs[j].first.length(), kvs[j].second.c_str(),
+          kvs[j].second.length());
+    }
+    iw->endPartition();
+  }
+  SingleSpillInfo * info = iw->getSpillInfo();
+  LOG("%s",
+      timer.getSpeedM2("Write data", info->getEndPosition(), info->getRealEndPosition()).c_str());
+  delete iw;
+
+  InputBuffer inputBuffer = InputBuffer(buff, outputBuffer.tell());
+  IFileReader * ir = new IFileReader(&inputBuffer, info);
+  timer.reset();
+  while (ir->nextPartition()) {
+    const char * key, *value;
+    uint32_t keyLen, valueLen;
+    while (NULL != (key = ir->nextKey(keyLen))) {
+      value = ir->value(valueLen);
+    }
+  }
+  LOG("%s",
+      timer.getSpeedM2(" Read data", info->getEndPosition(), info->getRealEndPosition()).c_str());
+  delete ir;
+  delete info;
+}
+
+
+
+TEST(Perf, IFile) {
+  int size = TestConfig.getInt("partition.size", 20000);
+  string codec = TestConfig.get("ifile.codec", "");
+  string type = TestConfig.get("ifile.type", "bytes");
+
+  vector<pair<string, string> > kvs;
+  Generate(kvs, size, type);
+  std::sort(kvs.begin(), kvs.end());
+
+  size_t buffsize = 200 * 1024 * 1024;
+  char * buff = new char[buffsize];
+  memset(buff, 0, buffsize);
+
+  LOG("Test TextType CRC32");
+  TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, TextType);
+  LOG("Test BytesType CRC32");
+  TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, BytesType);
+  LOG("Test UnknownType CRC32");
+  TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, UnknownType);
+  LOG("Test TextType CRC32C");
+  TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, TextType);
+  LOG("Test BytesType CRC32C");
+  TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, BytesType);
+  LOG("Test UnknownType CRC32C");
+  TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, UnknownType);
+  delete[] buff;
+}
+
+// The Glibc has a bug in the file tell api, it will overwrite the file data 
+// unexpected. 
+// Please check https://rhn.redhat.com/errata/RHBA-2013-0279.html
+// This case is to check wether the bug exists.
+// If it exists, it means you need to upgrade the glibc.
+TEST(IFile, TestGlibCBug) {
+  std::string path("./testData/testGlibCBugSpill.out");
+
+  uint32_t expect[5] = {-1538241715, -1288088794, -192294464, 563552421, 1661521654};
+
+  LOG("TestGlibCBug %s", path.c_str());
+  IFileSegment * segments = new IFileSegment [1];
+  segments[0].realEndOffset = 10000000;
+  SingleSpillInfo * info = new SingleSpillInfo(segments, 1, path, CHECKSUM_NONE,
+      IntType, TextType, "");
+
+  InputStream * fileOut = FileSystem::getLocal().open(path);
+  IFileReader * reader = new IFileReader(fileOut, info, true);
+
+  const char * key = NULL;
+  uint32_t length = 0;
+  reader->nextPartition();
+  uint32_t index = 0;
+  while(NULL != (key = reader->nextKey(length))) {
+    int realKey = bswap(*(uint32_t *)(key));
+    ASSERT_EQ(expect[index], realKey);
+    index++;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestMain.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestMain.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestMain.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestMain.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <signal.h>
+
+#ifndef __CYGWIN__
+#include <execinfo.h>
+#endif
+
+#include <stdexcept>
+#include "commons.h"
+#include "Buffers.h"
+#include "FileSystem.h"
+#include "test_commons.h"
+
+extern "C" {
+
+static void handler(int sig);
+
+// TODO: just for debug, should be removed
+void handler(int sig) {
+  void *array[10];
+  size_t size;
+
+  // print out all the frames to stderr
+  fprintf(stderr, "Error: signal %d:\n", sig);
+
+#ifndef __CYGWIN__
+  // get void*'s for all entries on the stack
+  size = backtrace(array, 10);
+
+  backtrace_symbols_fd(array, size, 2);
+#endif
+
+  exit(1);
+}
+}
+
+using namespace NativeTask;
+
+typedef char * CString;
+
+int main(int argc, char ** argv) {
+  signal(SIGSEGV, handler);
+  CString * newArgv = new CString[argc + 1];
+  memcpy(newArgv, argv, argc * sizeof(CString));
+
+  bool gen = false;
+  if (argc > 1) {
+    if (string("perf") == newArgv[1]) {
+      newArgv[1] = (char *)"--gtest_filter=Perf.*";
+    } else if (string("noperf") == newArgv[1]) {
+      newArgv[1] = (char *)"--gtest_filter=-Perf.*";
+    } else if (string("gen") == newArgv[1]) {
+      gen = true;
+    }
+  }
+  testing::InitGoogleTest(&argc, newArgv);
+  if (argc > 0) {
+    int skip = gen ? 2 : 1;
+    TestConfig.parse(argc - skip, (const char **)(newArgv + skip));
+  }
+  try {
+    if (gen == true) {
+      string type = TestConfig.get("generate.type", "word");
+      string codec = TestConfig.get("generate.codec", "");
+      int64_t len = TestConfig.getInt("generate.length", 1024);
+      string temp;
+      GenerateKVTextLength(temp, len, type);
+      if (codec.length() == 0) {
+        fprintf(stdout, "%s", temp.c_str());
+      } else {
+        OutputStream * fout = FileSystem::getLocal().create("/dev/stdout");
+        AppendBuffer app = AppendBuffer();
+        app.init(128 * 1024, fout, codec);
+        app.write(temp.data(), temp.length());
+        fout->close();
+        delete fout;
+      }
+      return 0;
+    } else {
+      return RUN_ALL_TESTS();
+    }
+  } catch (std::exception & e) {
+    fprintf(stderr, "Exception: %s", e.what());
+    return 1;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestPrimitives.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestPrimitives.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestPrimitives.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestPrimitives.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "test_commons.h"
+
+TEST(Primitives, fmemcmp) {
+  std::vector<std::string> vs;
+  char buff[14];
+  vs.push_back("");
+  for (uint32_t i = 0; i < 5000; i += 7) {
+    snprintf(buff, 14, "%d", i * 31);
+    vs.push_back(buff);
+    snprintf(buff, 10, "%010d", i);
+    vs.push_back(buff);
+  }
+  for (size_t i = 0; i < vs.size(); i++) {
+    for (size_t j = 0; j < vs.size(); j++) {
+      std::string & ls = vs[i];
+      std::string & rs = vs[j];
+      size_t m = std::min(ls.length(), rs.length());
+      int c = memcmp(ls.c_str(), rs.c_str(), m);
+      int t = fmemcmp(ls.c_str(), rs.c_str(), m);
+      if (!((c == 0 && t == 0) || (c > 0 && t > 0) || (c < 0 && t < 0))) {
+        ASSERT_TRUE(false);
+      }
+    }
+  }
+}
+
+static int test_memcmp() {
+  uint8_t buff[2048];
+  for (uint32_t i = 0; i < 2048; i++) {
+    buff[i] = i & 0xff;
+  }
+  std::random_shuffle(buff, buff + 2048);
+  int r = 0;
+  for (uint32_t i = 0; i < 100000000; i++) {
+    int offset = i % 1000;
+    r += memcmp(buff, buff + 1024, 5);
+    r += memcmp(buff + offset, buff + 1124, 9);
+    r += memcmp(buff + offset, buff + 1224, 10);
+    r += memcmp(buff + offset, buff + 1324, 15);
+    r += memcmp(buff + offset, buff + 1424, 16);
+    r += memcmp(buff + offset, buff + 1524, 17);
+    r += memcmp(buff + offset, buff + 1624, 18);
+    r += memcmp(buff + offset, buff + 1724, 19);
+  }
+  return r;
+}
+
+static int test_fmemcmp() {
+  char buff[2048];
+  for (uint32_t i = 0; i < 2048; i++) {
+    buff[i] = i & 0xff;
+  }
+  std::random_shuffle(buff, buff + 2048);
+  int r = 0;
+  for (uint32_t i = 0; i < 100000000; i++) {
+    int offset = i % 1000;
+    r += fmemcmp(buff, buff + 1024, 5);
+    r += fmemcmp(buff + offset, buff + 1124, 9);
+    r += fmemcmp(buff + offset, buff + 1224, 10);
+    r += fmemcmp(buff + offset, buff + 1324, 15);
+    r += fmemcmp(buff + offset, buff + 1424, 16);
+    r += fmemcmp(buff + offset, buff + 1524, 17);
+    r += fmemcmp(buff + offset, buff + 1624, 18);
+    r += fmemcmp(buff + offset, buff + 1724, 19);
+  }
+  return r;
+}
+
+TEST(Perf, fmemcmp) {
+  Timer t;
+  int a = test_memcmp();
+  LOG("%s", t.getInterval(" memcmp ").c_str());
+  t.reset();
+  int b = test_fmemcmp();
+  LOG("%s", t.getInterval(" fmemcmp ").c_str());
+  // prevent compiler optimization
+  TestConfig.setInt("tempvalue", a + b);
+}
+
+static void test_memcpy_perf_len(char * src, char * dest, size_t len, size_t time) {
+  for (size_t i = 0; i < time; i++) {
+    memcpy(src, dest, len);
+    memcpy(dest, src, len);
+  }
+}
+
+static void test_simple_memcpy_perf_len(char * src, char * dest, size_t len, size_t time) {
+  for (size_t i = 0; i < time; i++) {
+    simple_memcpy(src, dest, len);
+    simple_memcpy(dest, src, len);
+  }
+}
+
+TEST(Perf, simple_memcpy_small) {
+  char * src = new char[10240];
+  char * dest = new char[10240];
+  char buff[32];
+  for (size_t len = 1; len < 256; len = len + 2) {
+    LOG("------------------------------");
+    snprintf(buff, 32, "       memcpy %luB\t", len);
+    Timer t;
+    test_memcpy_perf_len(src, dest, len, 1000000);
+    LOG("%s", t.getInterval(buff).c_str());
+    snprintf(buff, 32, "simple_memcpy %luB\t", len);
+    t.reset();
+    test_simple_memcpy_perf_len(src, dest, len, 1000000);
+    LOG("%s", t.getInterval(buff).c_str());
+  }
+  delete[] src;
+  delete[] dest;
+}
+
+inline char * memchrbrf4(char * p, char ch, size_t len) {
+  ssize_t i = 0;
+  for (; i < ((ssize_t)len) - 3; i += 3) {
+    if (p[i] == ch) {
+      return p + i;
+    }
+    if (p[i + 1] == ch) {
+      return p + i + 1;
+    }
+    if (p[i + 2] == ch) {
+      return p + i + 2;
+    }
+  }
+  for (; i < len; i++) {
+    if (p[i] == ch) {
+      return p + i;
+    }
+  }
+  return NULL;
+}
+
+inline char * memchrbrf2(char * p, char ch, size_t len) {
+  for (size_t i = 0; i < len / 2; i += 2) {
+    if (p[i] == ch) {
+      return p + i;
+    }
+    if (p[i + 1] == ch) {
+      return p + i + 1;
+    }
+  }
+  if (len % 2 && p[len - 1] == ch) {
+    return p + len - 1;
+  }
+  return NULL;
+}
+
+// not safe in MACOSX, segment fault, should be safe on Linux with out mmap
+inline int memchr_sse(const char *s, int c, int len) {
+  //len : edx; c: esi; s:rdi
+  int index = 0;
+
+#ifdef __X64
+
+  __asm__ __volatile__(
+      //"and $0xff, %%esi;" //clear upper bytes
+      "movd %%esi, %%xmm1;"
+
+      "mov $1, %%eax;"
+      "add $16, %%edx;"
+      "mov %%rdi ,%%r8;"
+
+      "1:"
+      "movdqu (%%rdi), %%xmm2;"
+      "sub $16, %%edx;"
+      "addq $16, %%rdi;"
+      //"pcmpestri $0x0, %%xmm2,%%xmm1;"
+      ".byte 0x66 ,0x0f ,0x3a ,0x61 ,0xca ,0x00;"
+      //"lea 16(%%rdi), %%rdi;"
+      "ja 1b;"//Res2==0:no match and zflag==0: s is not end
+      "jc 3f;"//Res2==1: match and s is not end
+
+      "mov $0xffffffff, %%eax;"//no match
+      "jmp 0f;"
+
+      "3:"
+      "sub %%r8, %%rdi;"
+      "lea -16(%%edi,%%ecx),%%eax;"
+
+      "0:"
+      //        "mov %%eax, %0;"
+      :"=a"(index),"=D"(s),"=S"(c),"=d"(len)
+      :"D"(s),"S"(c),"d"(len)
+      :"rcx","r8","memory"
+  );
+
+#endif
+
+  return index;
+}
+
+TEST(Perf, memchr) {
+  Random r;
+  int32_t size = 100 * 1024 * 1024;
+  int32_t lineLength = TestConfig.getInt("memchr.line.length", 100);
+  char * buff = new char[size + 16];
+  memset(buff, 'a', size);
+  for (int i = 0; i < size / lineLength; i++) {
+    buff[r.next_int32(size)] = '\n';
+  }
+  Timer timer;
+  char * pos = buff;
+  int count = 0;
+  while (true) {
+    if (pos == buff + size) {
+      break;
+    }
+    pos = (char*)memchr(pos, '\n', buff + size - pos);
+    if (pos == NULL) {
+      break;
+    }
+    pos++;
+    count++;
+  }
+  LOG("%s", timer.getSpeedM2("memchr bytes/lines", size, count).c_str());
+  timer.reset();
+  pos = buff;
+  count = 0;
+  while (true) {
+    if (pos == buff + size) {
+      break;
+    }
+    pos = (char*)memchrbrf2(pos, '\n', buff + size - pos);
+    if (pos == NULL) {
+      break;
+    }
+    pos++;
+    count++;
+  }
+  LOG("%s", timer.getSpeedM2("memchrbrf2 bytes/lines", size, count).c_str());
+  timer.reset();
+  pos = buff;
+  count = 0;
+  while (true) {
+    if (pos == buff + size) {
+      break;
+    }
+    pos = (char*)memchrbrf4(pos, '\n', buff + size - pos);
+    if (pos == NULL) {
+      break;
+    }
+    pos++;
+    count++;
+  }
+  LOG("%s", timer.getSpeedM2("memchrbrf4 bytes/lines", size, count).c_str());
+  timer.reset();
+  pos = buff;
+  count = 0;
+  while (true) {
+    if (pos == buff + size) {
+      break;
+    }
+    int ret = memchr_sse(pos, '\n', buff + size - pos);
+    if (ret == -1) {
+      break;
+    }
+    pos = pos + ret;
+    pos++;
+    count++;
+  }
+  LOG("%s", timer.getSpeedM2("memchr_sse bytes/lines", size, count).c_str());
+  delete[] buff;
+}
+
+TEST(Perf, memcpy_batch) {
+  int32_t size = TestConfig.getInt("input.size", 64 * 1024);
+  size_t mb = TestConfig.getInt("input.mb", 320) * 1024 * 1024UL;
+  char * src = new char[size];
+  char * dest = new char[size];
+  memset(src, 0, size);
+  memset(dest, 0, size);
+  Timer t;
+  for (size_t i = 0; i < mb; i += size) {
+    memcpy(dest, src, size);
+  }
+  LOG("%s", t.getSpeedM("memcpy", mb).c_str());
+  t.reset();
+  for (size_t i = 0; i < mb; i += size) {
+    simple_memcpy(dest, src, size);
+  }
+  LOG("%s", t.getSpeedM("simple_memcpy", mb).c_str());
+  delete[] src;
+  delete[] dest;
+}
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestSort.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestSort.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestSort.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestSort.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "Streams.h"
+#include "Buffers.h"
+#include "DualPivotQuickSort.h"
+#include "test_commons.h"
+
+string gBuffer;
+
+inline const char * get_position(uint32_t offset) {
+  return gBuffer.data() + offset;
+}
+
+/**
+ * fast memcmp
+ */
+inline int fmemcmporig(const char * src, const char * dest, uint32_t len) {
+  const uint64_t * src8 = (const uint64_t*)src;
+  const uint64_t * dest8 = (const uint64_t*)dest;
+  while (len >= 8) {
+    uint64_t l = *src8;
+    uint64_t r = *dest8;
+    if (l != r) {
+      l = bswap64(l);
+      r = bswap64(r);
+      return l > r ? 1 : -1;
+    }
+    ++src8;
+    ++dest8;
+    len -= 8;
+  }
+  if (len == 0)
+    return 0;
+  if (len == 1) {
+    int l = (int)(*(uint8_t*)src8);
+    int r = (int)(*(uint8_t*)dest8);
+    return l - r;
+  }
+  uint64_t mask = (1ULL << (len * 8)) - 1;
+  uint64_t l = (*src8) & mask;
+  uint64_t r = (*dest8) & mask;
+  if (l == r) {
+    return 0;
+  }
+  l = bswap64(l);
+  r = bswap64(r);
+  return l > r ? 1 : -1;
+}
+
+/**
+ * c qsort compare function
+ */
+static int compare_offset(const void * plh, const void * prh) {
+  KVBuffer * lhb = (KVBuffer*)get_position(*(uint32_t*)plh);
+  KVBuffer * rhb = (KVBuffer*)get_position(*(uint32_t*)prh);
+  uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+  int ret = memcmp(lhb->getKey(), rhb->getKey(), minlen);
+  if (ret) {
+    return ret;
+  }
+  return lhb->keyLength - rhb->keyLength;
+}
+
+/**
+ * dualpivot sort compare function
+ */
+class CompareOffset {
+public:
+  int64_t operator()(uint32_t lhs, uint32_t rhs) {
+
+    KVBuffer * lhb = (KVBuffer*)get_position(lhs);
+    KVBuffer * rhb = (KVBuffer*)get_position(rhs);
+
+    uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+    int64_t ret = memcmp(lhb->getKey(), rhb->getKey(), minlen);
+    if (ret) {
+      return ret;
+    }
+    return lhb->keyLength - rhb->keyLength;
+  }
+};
+
+/**
+ * quicksort compare function
+ */
+class OffsetLessThan {
+public:
+  bool operator()(uint32_t lhs, uint32_t rhs) {
+    KVBuffer * lhb = (KVBuffer*)get_position(lhs);
+    KVBuffer * rhb = (KVBuffer*)get_position(rhs);
+
+    uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+    int64_t ret = memcmp(lhb->content, rhb->content, minlen);
+    return ret < 0 || (ret == 0 && (lhb->keyLength < rhb->keyLength));
+  }
+};
+
+/**
+ * c qsort compare function
+ */
+static int compare_offset2(const void * plh, const void * prh) {
+
+  KVBuffer * lhb = (KVBuffer*)get_position(*(uint32_t*)plh);
+  KVBuffer * rhb = (KVBuffer*)get_position(*(uint32_t*)prh);
+
+  uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+  int64_t ret = fmemcmp(lhb->content, rhb->content, minlen);
+  if (ret) {
+    return ret;
+  }
+  return lhb->keyLength - rhb->keyLength;
+}
+
+/**
+ * dualpivot sort compare function
+ */
+class CompareOffset2 {
+public:
+  int64_t operator()(uint32_t lhs, uint32_t rhs) {
+
+    KVBuffer * lhb = (KVBuffer*)get_position(lhs);
+    KVBuffer * rhb = (KVBuffer*)get_position(rhs);
+
+    uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+    int64_t ret = fmemcmp(lhb->content, rhb->content, minlen);
+    if (ret) {
+      return ret;
+    }
+    return lhb->keyLength - rhb->keyLength;
+  }
+};
+
+/**
+ * quicksort compare function
+ */
+class OffsetLessThan2 {
+public:
+  bool operator()(uint32_t lhs, uint32_t rhs) {
+
+    KVBuffer * lhb = (KVBuffer*)get_position(lhs);
+    KVBuffer * rhb = (KVBuffer*)get_position(rhs);
+
+    uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+    int64_t ret = fmemcmp(lhb->content, rhb->content, minlen);
+    return ret < 0 || (ret == 0 && (lhb->keyLength < rhb->keyLength));
+  }
+};
+
+/*
+void makeInput(string & dest, vector<uint32_t> & offsets, uint64_t length) {
+  TeraGen tera = TeraGen(length / 100, 1, 0);
+  dest.reserve(length + 1024);
+  string k, v;
+  while (tera.next(k, v)) {
+    offsets.push_back(dest.length());
+    uint32_t tempLen = k.length();
+    dest.append((const char *)&tempLen, 4);
+    dest.append(k.data(), k.length());
+    tempLen = v.length();
+    dest.append((const char *)&tempLen, 4);
+    dest.append(v.data(), v.length());
+  }
+}
+*/
+
+void makeInputWord(string & dest, vector<uint32_t> & offsets, uint64_t length) {
+  Random r;
+  dest.reserve(length + 1024);
+  string k, v;
+  while (true) {
+    k = r.nextWord();
+    v = r.nextWord();
+    offsets.push_back(dest.length());
+    uint32_t tempLen = k.length();
+    dest.append((const char *)&tempLen, 4);
+    dest.append(k.data(), k.length());
+    tempLen = v.length();
+    dest.append((const char *)&tempLen, 4);
+    dest.append(v.data(), v.length());
+    if (dest.length() > length) {
+      return;
+    }
+  }
+}
+
+TEST(Perf, sort) {
+  vector<uint32_t> offsets;
+  makeInputWord(gBuffer, offsets, 80000000);
+  Timer timer;
+  vector<uint32_t> offsetstemp1_0 = offsets;
+  vector<uint32_t> offsetstemp1_1 = offsets;
+  vector<uint32_t> offsetstemp1_2 = offsets;
+  vector<uint32_t> offsetstemp1_3 = offsets;
+  timer.reset();
+  qsort(&offsetstemp1_0[0], offsetstemp1_0.size(), sizeof(uint32_t), compare_offset);
+  qsort(&offsetstemp1_1[0], offsetstemp1_1.size(), sizeof(uint32_t), compare_offset);
+  qsort(&offsetstemp1_2[0], offsetstemp1_2.size(), sizeof(uint32_t), compare_offset);
+  qsort(&offsetstemp1_3[0], offsetstemp1_3.size(), sizeof(uint32_t), compare_offset);
+  LOG("%s", timer.getInterval("qsort").c_str());
+  offsetstemp1_0 = offsets;
+  offsetstemp1_1 = offsets;
+  offsetstemp1_2 = offsets;
+  offsetstemp1_3 = offsets;
+  timer.reset();
+  qsort(&offsetstemp1_0[0], offsetstemp1_0.size(), sizeof(uint32_t), compare_offset2);
+  qsort(&offsetstemp1_1[0], offsetstemp1_1.size(), sizeof(uint32_t), compare_offset2);
+  qsort(&offsetstemp1_2[0], offsetstemp1_2.size(), sizeof(uint32_t), compare_offset2);
+  qsort(&offsetstemp1_3[0], offsetstemp1_3.size(), sizeof(uint32_t), compare_offset2);
+  LOG("%s", timer.getInterval("qsort 2").c_str());
+  offsetstemp1_0 = offsets;
+  offsetstemp1_1 = offsets;
+  offsetstemp1_2 = offsets;
+  offsetstemp1_3 = offsets;
+  timer.reset();
+  std::sort(offsetstemp1_0.begin(), offsetstemp1_0.end(), OffsetLessThan());
+  std::sort(offsetstemp1_1.begin(), offsetstemp1_1.end(), OffsetLessThan());
+  std::sort(offsetstemp1_2.begin(), offsetstemp1_2.end(), OffsetLessThan());
+  std::sort(offsetstemp1_3.begin(), offsetstemp1_3.end(), OffsetLessThan());
+  LOG("%s", timer.getInterval("std::sort").c_str());
+  offsetstemp1_0 = offsets;
+  offsetstemp1_1 = offsets;
+  offsetstemp1_2 = offsets;
+  offsetstemp1_3 = offsets;
+  timer.reset();
+  std::sort(offsetstemp1_0.begin(), offsetstemp1_0.end(), OffsetLessThan2());
+  std::sort(offsetstemp1_1.begin(), offsetstemp1_1.end(), OffsetLessThan2());
+  std::sort(offsetstemp1_2.begin(), offsetstemp1_2.end(), OffsetLessThan2());
+  std::sort(offsetstemp1_3.begin(), offsetstemp1_3.end(), OffsetLessThan2());
+  LOG("%s", timer.getInterval("std::sort 2").c_str());
+  offsetstemp1_0 = offsets;
+  offsetstemp1_1 = offsets;
+  offsetstemp1_2 = offsets;
+  offsetstemp1_3 = offsets;
+  timer.reset();
+  DualPivotQuicksort(offsetstemp1_0, CompareOffset());
+  DualPivotQuicksort(offsetstemp1_1, CompareOffset());
+  DualPivotQuicksort(offsetstemp1_2, CompareOffset());
+  DualPivotQuicksort(offsetstemp1_3, CompareOffset());
+  LOG("%s", timer.getInterval("DualPivotQuicksort").c_str());
+  offsetstemp1_0 = offsets;
+  offsetstemp1_1 = offsets;
+  offsetstemp1_2 = offsets;
+  offsetstemp1_3 = offsets;
+  timer.reset();
+  DualPivotQuicksort(offsetstemp1_0, CompareOffset2());
+  DualPivotQuicksort(offsetstemp1_1, CompareOffset2());
+  DualPivotQuicksort(offsetstemp1_2, CompareOffset2());
+  DualPivotQuicksort(offsetstemp1_3, CompareOffset2());
+  LOG("%s", timer.getInterval("DualPivotQuicksort 2").c_str());
+}
+
+TEST(Perf, sortCacheMiss) {
+
+  LOG("Testing partition based sort, sort 4MB every time");
+
+  vector<uint32_t> offsets;
+  makeInputWord(gBuffer, offsets, 80000000);
+  Timer timer;
+  vector<uint32_t> offsetstemp1_0 = offsets;
+  vector<uint32_t> offsetstemp1_1 = offsets;
+  vector<uint32_t> offsetstemp1_2 = offsets;
+  vector<uint32_t> offsetstemp1_3 = offsets;
+
+  timer.reset();
+  DualPivotQuicksort(offsetstemp1_0, CompareOffset2());
+  DualPivotQuicksort(offsetstemp1_1, CompareOffset2());
+  DualPivotQuicksort(offsetstemp1_2, CompareOffset2());
+  DualPivotQuicksort(offsetstemp1_3, CompareOffset2());
+  LOG("%s", timer.getInterval("DualPivotQuicksort 2 full sort").c_str());
+
+  uint32_t MOD = 128000;
+  uint32_t END = offsets.size();
+
+  for (MOD = 1024; MOD < END; MOD <<= 1) {
+    offsetstemp1_0 = offsets;
+    offsetstemp1_1 = offsets;
+    offsetstemp1_2 = offsets;
+    offsetstemp1_3 = offsets;
+    timer.reset();
+
+    for (uint32_t i = 0; i <= END / MOD; i++) {
+      int base = i * MOD;
+      int max = (base + MOD) > END ? END : (base + MOD);
+      DualPivotQuicksort(offsetstemp1_0, base, max - 1, 3, CompareOffset2());
+    }
+
+    for (uint32_t i = 0; i <= END / MOD; i++) {
+      int base = i * MOD;
+      int max = (base + MOD) > END ? END : (base + MOD);
+      DualPivotQuicksort(offsetstemp1_1, base, max - 1, 3, CompareOffset2());
+    }
+
+    for (uint32_t i = 0; i <= END / MOD; i++) {
+      int base = i * MOD;
+      int max = (base + MOD) > END ? END : (base + MOD);
+      DualPivotQuicksort(offsetstemp1_2, base, max - 1, 3, CompareOffset2());
+    }
+
+    for (uint32_t i = 0; i <= END / MOD; i++) {
+      int base = i * MOD;
+      int max = (base + MOD) > END ? END : (base + MOD);
+      DualPivotQuicksort(offsetstemp1_3, base, max - 1, 3, CompareOffset2());
+    }
+    LOG("%s, MOD: %d", timer.getInterval("DualPivotQuicksort 2 partition sort").c_str(), MOD);
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteArray.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteArray.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteArray.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteArray.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(ByteArray, read) {
+  ByteArray * buffer = new ByteArray();
+  buffer->resize(10);
+  ASSERT_EQ(10, buffer->size());
+  char * buff1 = buffer->buff();
+
+  buffer->resize(15);
+  ASSERT_EQ(15, buffer->size());
+  ASSERT_EQ(buffer->buff(), buff1);
+
+  buffer->resize(30);
+  ASSERT_EQ(30, buffer->size());
+  ASSERT_NE(buffer->buff(), buff1);
+
+  delete buffer;
+}
+
+} /* namespace NativeTask */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteBuffer.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteBuffer.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteBuffer.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteBuffer.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(ByteBuffer, read) {
+  char * buff = new char[100];
+  ByteBuffer byteBuffer;
+  byteBuffer.reset(buff, 100);
+
+  ASSERT_EQ(0, byteBuffer.position());
+  ASSERT_EQ(100, byteBuffer.capacity());
+  ASSERT_EQ(0, byteBuffer.limit());
+
+  ASSERT_EQ(buff, byteBuffer.current());
+  ASSERT_EQ(0, byteBuffer.remain());
+
+  int newPos = byteBuffer.advance(3);
+  ASSERT_EQ(3, byteBuffer.current() - byteBuffer.base());
+
+  byteBuffer.rewind(10, 20);
+  ASSERT_EQ(20, byteBuffer.limit());
+
+  ASSERT_EQ(10, byteBuffer.position());
+}
+} /* namespace NativeTask */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForDualPivotQuickSort.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForDualPivotQuickSort.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForDualPivotQuickSort.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForDualPivotQuickSort.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "lib/MemoryBlock.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+static const char * expectedSrc = NULL;
+static int expectedSrcLength = 0;
+
+static const char * expectedDest = NULL;
+static int expectedDestLength = 0;
+
+static int compareResult = 0;
+
+void checkInputArguments(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  ASSERT_EQ(expectedSrc, src);
+  ASSERT_EQ(expectedSrcLength, srcLength);
+
+  ASSERT_EQ(expectedDest, dest);
+  ASSERT_EQ(expectedDestLength, destLength);
+}
+
+int MockComparatorForDualPivot(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  checkInputArguments(src, srcLength, dest, destLength);
+  return compareResult;
+}
+
+TEST(ComparatorForDualPivotQuickSort, compare) {
+  char * buff = new char[100];
+  KVBuffer * kv1 = (KVBuffer *)buff;
+
+  const char * KEY = "KEY";
+  const char * VALUE = "VALUE";
+
+  kv1->keyLength = strlen(KEY);
+  char * key = kv1->getKey();
+  ::memcpy(key, KEY, strlen(KEY));
+  kv1->valueLength = strlen(VALUE);
+  char * value = kv1->getValue();
+  ::memcpy(value, VALUE, strlen(VALUE));
+
+  const char * KEY2 = "KEY2";
+  const char * VALUE2 = "VALUE2";
+
+  KVBuffer * kv2 = kv1->next();
+  kv2->keyLength = strlen(KEY2);
+  char * key2 = kv2->getKey();
+  ::memcpy(key2, KEY2, strlen(KEY2));
+  kv2->valueLength = strlen(VALUE2);
+  char * value2 = kv2->getValue();
+  ::memcpy(value2, VALUE2, strlen(VALUE2));
+
+  ComparatorForDualPivotSort comparator(buff, &MockComparatorForDualPivot);
+
+  expectedSrc = kv1->getKey();
+  expectedSrcLength = strlen(KEY);
+
+  expectedDest = kv2->getKey();
+  expectedDestLength = strlen(KEY2);
+
+  compareResult = -1;
+
+  ASSERT_EQ(-1, comparator((char * )kv1 - buff, (char * )kv2 - buff));
+}
+
+} /* namespace NativeTask */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForStdSort.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForStdSort.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForStdSort.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForStdSort.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "lib/MemoryBlock.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+static const char * expectedSrc = NULL;
+static int expectedSrcLength = 0;
+
+static const char * expectedDest = NULL;
+static int expectedDestLength = 0;
+
+static int compareResult = 0;
+
+void checkInputArgumentsForStdOut(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  ASSERT_EQ(expectedSrc, src);
+  ASSERT_EQ(expectedSrcLength, srcLength);
+
+  ASSERT_EQ(expectedDest, dest);
+  ASSERT_EQ(expectedDestLength, destLength);
+}
+
+int MockComparatorForStdOut(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  checkInputArgumentsForStdOut(src, srcLength, dest, destLength);
+  return compareResult;
+}
+
+TEST(ComparatorForStdSort, compare) {
+  char * buff = new char[100];
+  KVBuffer * kv1 = (KVBuffer *)buff;
+
+  const char * KEY = "KEY";
+  const char * VALUE = "VALUE";
+
+  kv1->keyLength = strlen(KEY);
+  char * key = kv1->getKey();
+  ::memcpy(key, KEY, strlen(KEY));
+  kv1->valueLength = strlen(VALUE);
+  char * value = kv1->getValue();
+  ::memcpy(value, VALUE, strlen(VALUE));
+
+  const char * KEY2 = "KEY2";
+  const char * VALUE2 = "VALUE2";
+
+  KVBuffer * kv2 = kv1->next();
+  kv2->keyLength = strlen(KEY2);
+  char * key2 = kv2->getKey();
+  ::memcpy(key2, KEY2, strlen(KEY2));
+  kv2->valueLength = strlen(VALUE2);
+  char * value2 = kv2->getValue();
+  ::memcpy(value2, VALUE2, strlen(VALUE2));
+
+  ComparatorForStdSort comparator(buff, &MockComparatorForStdOut);
+
+  expectedSrc = kv1->content;
+  expectedSrcLength = strlen(KEY);
+
+  expectedDest = kv2->content;
+  expectedDestLength = strlen(KEY2);
+
+  compareResult = -1;
+
+  ASSERT_EQ(true, comparator((char * )kv1 - buff, (char * )kv2 - buff));
+}
+
+} /* namespace NativeTask */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestFixSizeContainer.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestFixSizeContainer.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestFixSizeContainer.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestFixSizeContainer.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(FixSizeContainer, test) {
+  uint32_t length = 100;
+  FixSizeContainer * container = new FixSizeContainer();
+  char * bytes = new char[length];
+  container->wrap(bytes, length);
+
+  ASSERT_EQ(0, container->position());
+  int pos1 = 3;
+  container->position(pos1);
+  ASSERT_EQ(pos1, container->position());
+  ASSERT_EQ(length - pos1, container->remain());
+
+  container->rewind();
+  ASSERT_EQ(0, container->position());
+  ASSERT_EQ(length, container->size());
+
+  std::string toBeFilled = "Hello, FixContainer";
+
+  container->fill(toBeFilled.c_str(), toBeFilled.length());
+
+  for (int i = 0; i < container->position(); i++) {
+    char * c = container->base() + i;
+    ASSERT_EQ(toBeFilled[i], *c);
+  }
+}
+
+} /* namespace NativeTask */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestIterator.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestIterator.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestIterator.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestIterator.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+class MockIterator : public KVIterator {
+  std::vector<std::pair<int, int> > * kvs;
+  uint32_t index;
+  uint32_t expectedKeyGroupNum;
+  std::map<int, int> expectkeyCountMap;
+  char * buffer;
+
+public:
+  MockIterator()
+      : index(0), buffer(NULL) {
+    buffer = new char[8];
+    kvs = new std::vector<std::pair<int, int> >();
+    kvs->push_back(std::pair<int, int>(10, 100));
+
+    kvs->push_back(std::pair<int, int>(10, 100));
+    kvs->push_back(std::pair<int, int>(10, 101));
+    kvs->push_back(std::pair<int, int>(10, 102));
+
+    kvs->push_back(std::pair<int, int>(20, 200));
+    kvs->push_back(std::pair<int, int>(20, 201));
+    kvs->push_back(std::pair<int, int>(20, 202));
+    kvs->push_back(std::pair<int, int>(30, 302));
+    kvs->push_back(std::pair<int, int>(40, 302));
+    this->expectedKeyGroupNum = 4;
+
+    expectkeyCountMap[10] = 4;
+    expectkeyCountMap[20] = 3;
+    expectkeyCountMap[30] = 1;
+    expectkeyCountMap[40] = 1;
+  }
+
+  bool next(Buffer & key, Buffer & outValue) {
+    if (index < kvs->size()) {
+      std::pair<int, int> value = kvs->at(index);
+      *((int *)buffer) = value.first;
+      *(((int *)buffer) + 1) = value.second;
+      key.reset(buffer, 4);
+      outValue.reset(buffer + 4, 4);
+      index++;
+      return true;
+    }
+    return false;
+  }
+
+  uint32_t getExpectedKeyGroupCount() {
+    return expectedKeyGroupNum;
+  }
+
+  std::map<int, int>& getExpectedKeyCountMap() {
+    return expectkeyCountMap;
+  }
+};
+
+void TestKeyGroupIterator() {
+  MockIterator * iter = new MockIterator();
+  KeyGroupIteratorImpl * groupIterator = new KeyGroupIteratorImpl(iter);
+  const char * key = NULL;
+
+  uint32_t keyGroupCount = 0;
+  std::map<int, int> actualKeyCount;
+  while (groupIterator->nextKey()) {
+    keyGroupCount++;
+    uint32_t length = 0;
+    key = groupIterator->getKey(length);
+    int * keyPtr = (int *)key;
+    std::cout << "new key group(key group hold kvs of same key): " << *keyPtr << std::endl;
+    const char * value = NULL;
+    while (NULL != (value = groupIterator->nextValue(length))) {
+      int * valuePtr = (int *)value;
+      std::cout << "==== key: " << *keyPtr << "value: " << *valuePtr << std::endl;
+
+      if (actualKeyCount.find(*keyPtr) == actualKeyCount.end()) {
+        actualKeyCount[*keyPtr] = 0;
+      }
+      actualKeyCount[*keyPtr]++;
+    }
+  }
+  ASSERT_EQ(iter->getExpectedKeyGroupCount(), keyGroupCount);
+  std::map<int, int> & expectedKeyCountMap = iter->getExpectedKeyCountMap();
+  for (std::map<int, int>::iterator keyCountIter = actualKeyCount.begin();
+      keyCountIter != actualKeyCount.end(); ++keyCountIter) {
+    uint32_t key = keyCountIter->first;
+    uint32_t expectedCount = expectedKeyCountMap[key];
+    ASSERT_EQ(expectedCount, keyCountIter->second);
+  }
+
+  std::cout << "Done!!!!!!! " << std::endl;
+}
+
+TEST(Iterator, keyGroupIterator) {
+  TestKeyGroupIterator();
+}
+
+} /* namespace NativeTask */
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestKVBuffer.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestKVBuffer.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestKVBuffer.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestKVBuffer.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(KVBuffer, test) {
+
+  char * buff = new char[100];
+  KVBuffer * kv1 = (KVBuffer *)buff;
+
+  const char * KEY = "KEY";
+  const char * VALUE = "VALUE";
+
+  kv1->keyLength = strlen(KEY);
+  char * key = kv1->getKey();
+  ::memcpy(key, KEY, strlen(KEY));
+  kv1->valueLength = strlen(VALUE);
+  char * value = kv1->getValue();
+  ::memcpy(value, VALUE, strlen(VALUE));
+
+  ASSERT_EQ(strlen(KEY) + strlen(VALUE) + 8, kv1->length());
+
+  ASSERT_EQ(8, kv1->getKey() - buff);
+  ASSERT_EQ(strlen(KEY) + 8, kv1->getValue() - buff);
+
+  kv1->keyLength = bswap(kv1->keyLength);
+  kv1->valueLength = bswap(kv1->valueLength);
+
+  ASSERT_EQ(8, kv1->headerLength());
+  ASSERT_EQ(strlen(KEY) + strlen(VALUE) + 8, kv1->lengthConvertEndium());
+}
+
+} /* namespace NativeTask */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemBlockIterator.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemBlockIterator.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemBlockIterator.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemBlockIterator.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "test_commons.h"
+#include "MapOutputSpec.h"
+#include "lib/MemoryBlock.h"
+
+using namespace NativeTask;
+
+namespace NativeTask {
+
+TEST(MemoryBlockIterator, test) {
+  const uint32_t BUFFER_LENGTH = 100;
+  const uint32_t BLOCK_ID = 3;
+  char * bytes = new char[BUFFER_LENGTH];
+  MemoryBlock block(bytes, BUFFER_LENGTH);
+
+  const uint32_t KV_SIZE = 60;
+  KVBuffer * kv1 = block.allocateKVBuffer(KV_SIZE);
+  KVBuffer * kv2 = block.allocateKVBuffer(KV_SIZE);
+
+  MemBlockIterator iter(&block);
+
+  uint32_t keyCount = 0;
+  while (iter.next()) {
+    KVBuffer * kv = iter.getKVBuffer();
+    ASSERT_EQ(block.getKVBuffer(keyCount), kv);
+    keyCount++;
+  }
+}
+
+class MemoryBlockFactory {
+public:
+  static MemoryBlock * create(std::vector<int> & keys) {
+    const uint32_t BUFFER_LENGTH = 1000;
+    const uint32_t BLOCK_ID = 3;
+    char * bytes = new char[BUFFER_LENGTH];
+    MemoryBlock * block1 = new MemoryBlock(bytes, BUFFER_LENGTH);
+
+    const uint32_t KV_SIZE = 16;
+
+    for (uint32_t i = 0; i < keys.size(); i++) {
+      uint32_t index = keys[i];
+      KVBuffer * kv = block1->allocateKVBuffer(KV_SIZE);
+
+      kv->keyLength = 4;
+      kv->valueLength = 4;
+      uint32_t * key = (uint32_t *)kv->getKey();
+      *key = bswap(index);
+    }
+    return block1;
+  }
+
+};
+
+TEST(MemoryBlockIterator, compare) {
+  std::vector<int> vector1;
+
+  vector1.push_back(2);
+  vector1.push_back(4);
+  vector1.push_back(6);
+
+  std::vector<int> vector2;
+
+  vector2.push_back(1);
+  vector2.push_back(3);
+  vector2.push_back(5);
+
+  ComparatorPtr bytesComparator = NativeTask::get_comparator(BytesType, NULL);
+
+  MemoryBlock * block1 = MemoryBlockFactory::create(vector1);
+  MemoryBlock * block2 = MemoryBlockFactory::create(vector2);
+
+  block1->sort(CPPSORT, bytesComparator);
+  block2->sort(CPPSORT, bytesComparator);
+
+  MemBlockIterator * iter1 = new MemBlockIterator(block1);
+  MemBlockIterator * iter2 = new MemBlockIterator(block2);
+
+  MemBlockComparator comparator(bytesComparator);
+
+  ASSERT_EQ(false, comparator(iter1, iter2));
+
+  iter1->next();
+  ASSERT_EQ(true, comparator(iter1, iter2));
+
+  iter2->next();
+  ASSERT_EQ(false, comparator(iter1, iter2));
+}
+}
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryBlock.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryBlock.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryBlock.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryBlock.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "test_commons.h"
+#include "MapOutputSpec.h"
+#include "lib/MemoryBlock.h"
+
+using namespace NativeTask;
+namespace NativeTaskTest {
+
+TEST(MemoryBlock, test) {
+  const uint32_t BUFFER_LENGTH = 1000;
+  char * bytes = new char[BUFFER_LENGTH];
+  MemoryBlock block(bytes, BUFFER_LENGTH);
+
+  uint32_t NON_EXIST = 3;
+  ASSERT_EQ(NULL, block.getKVBuffer(NON_EXIST));
+  ASSERT_EQ(0, block.getKVCount());
+  ASSERT_EQ(BUFFER_LENGTH, block.remainSpace());
+
+  ComparatorPtr bytesComparator = NativeTask::get_comparator(BytesType, NULL);
+  block.sort(CPPSORT, bytesComparator);
+  ASSERT_EQ(true, block.sorted());
+
+  const uint32_t KV_SIZE = 16;
+  KVBuffer * kv1 = block.allocateKVBuffer(KV_SIZE);
+  KVBuffer * kv2 = block.allocateKVBuffer(KV_SIZE);
+
+  ASSERT_EQ(2, block.getKVCount());
+  ASSERT_EQ(kv1, block.getKVBuffer(0));
+
+  ASSERT_EQ(BUFFER_LENGTH - 2 * KV_SIZE, block.remainSpace());
+  ASSERT_EQ(false, block.sorted());
+}
+
+TEST(MemoryBlock, overflow) {
+  const uint32_t BUFFER_LENGTH = 100;
+  char * bytes = new char[BUFFER_LENGTH];
+  MemoryBlock block(bytes, BUFFER_LENGTH);
+
+  const uint32_t KV_SIZE = 60;
+  KVBuffer * kv1 = block.allocateKVBuffer(KV_SIZE);
+  KVBuffer * kv2 = block.allocateKVBuffer(KV_SIZE);
+
+  ASSERT_EQ(kv1, block.getKVBuffer(0));
+  ASSERT_EQ(kv2, block.getKVBuffer(1));
+
+  ASSERT_EQ(1, block.getKVCount());
+
+  ASSERT_EQ(BUFFER_LENGTH - KV_SIZE, block.remainSpace());
+}
+
+TEST(MemoryBlock, sort) {
+  const uint32_t BUFFER_LENGTH = 1000;
+  char * bytes = new char[BUFFER_LENGTH];
+  MemoryBlock block(bytes, BUFFER_LENGTH);
+
+  const uint32_t KV_SIZE = 16;
+  KVBuffer * big = block.allocateKVBuffer(KV_SIZE);
+  KVBuffer * small = block.allocateKVBuffer(KV_SIZE);
+  KVBuffer * medium = block.allocateKVBuffer(KV_SIZE);
+
+  const uint32_t SMALL = 100;
+  const uint32_t MEDIUM = 1000;
+  const uint32_t BIG = 10000;
+
+  medium->keyLength = 4;
+  medium->valueLength = 4;
+  uint32_t * mediumKey = (uint32_t *)medium->getKey();
+  *mediumKey = bswap(MEDIUM);
+
+  small->keyLength = 4;
+  small->valueLength = 4;
+  uint32_t * smallKey = (uint32_t *)small->getKey();
+  *smallKey = bswap(SMALL);
+
+  big->keyLength = 4;
+  big->valueLength = 4;
+  uint32_t * bigKey = (uint32_t *)big->getKey();
+  *bigKey = bswap(BIG);
+
+  ComparatorPtr bytesComparator = NativeTask::get_comparator(BytesType, NULL);
+  block.sort(CPPSORT, bytesComparator);
+
+  ASSERT_EQ(small, block.getKVBuffer(0));
+  ASSERT_EQ(medium, block.getKVBuffer(1));
+  ASSERT_EQ(big, block.getKVBuffer(2));
+}
+
+}
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryPool.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryPool.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryPool.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryPool.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "test_commons.h"
+#include "PartitionBucket.h"
+#include "PartitionBucketIterator.h"
+#include "MemoryBlock.h"
+#include "IFile.h"
+
+using namespace NativeTask;
+
+namespace NativeTask {
+
+TEST(MemoryPool, general) {
+  MemoryPool * pool = new MemoryPool();
+  const uint32_t POOL_SIZE = 1024;
+
+  pool->init(POOL_SIZE);
+
+  uint32_t min = 1024;
+  uint32_t expect = 2048;
+  uint32_t allocated = 0;
+  char * buff = pool->allocate(min, expect, allocated);
+  ASSERT_NE((void *)NULL, buff);
+  buff = pool->allocate(min, expect, allocated);
+  ASSERT_EQ(NULL, buff);
+
+  pool->reset();
+  buff = pool->allocate(min, expect, allocated);
+  ASSERT_NE((void *)NULL, buff);
+
+  delete pool;
+}
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestPartitionBucket.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestPartitionBucket.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestPartitionBucket.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestPartitionBucket.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "test_commons.h"
+#include "PartitionBucket.h"
+#include "PartitionBucketIterator.h"
+#include "MemoryBlock.h"
+#include "IFile.h"
+
+using namespace NativeTask;
+
+namespace NativeTask {
+
+class MockIFileWriter : public IFileWriter {
+private:
+  char * _buff;
+  uint32_t _position;
+  uint32_t _capacity;
+public:
+
+  MockIFileWriter(char * buff, uint32_t capacity)
+      : IFileWriter(NULL, CHECKSUM_NONE, TextType, TextType, "", NULL), _buff(buff), _position(0),
+          _capacity(capacity) {
+  }
+
+  virtual void write(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen) {
+    KVBuffer * kv = (KVBuffer *)(_buff + _position);
+    kv->keyLength = keyLen;
+    kv->valueLength = valueLen;
+    *((uint32_t *)kv->getKey()) = *((uint32_t *)key);
+    *((uint32_t *)kv->getValue()) = *((uint32_t *)value);
+    _position += kv->length();
+  }
+
+  char * buff() {
+    return _buff;
+  }
+};
+
+TEST(PartitionBucket, general) {
+  MemoryPool * pool = new MemoryPool();
+  const uint32_t POOL_SIZE = 1024 * 1024; //1MB
+  const uint32_t BLOCK_SIZE = 1024; //1KB
+  const uint32_t PARTITION_ID = 3;
+  pool->init(POOL_SIZE);
+  ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
+  PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
+  ASSERT_EQ(0, bucket->getKVCount());
+  KVIterator * NULLPOINTER = 0;
+  ASSERT_EQ(NULLPOINTER, bucket->getIterator());
+  ASSERT_EQ(PARTITION_ID, bucket->getPartitionId());
+  bucket->sort(DUALPIVOTSORT);
+  bucket->spill(NULL);
+
+  delete bucket;
+  delete pool;
+}
+
+TEST(PartitionBucket, multipleMemoryBlock) {
+  MemoryPool * pool = new MemoryPool();
+  const uint32_t POOL_SIZE = 1024 * 1024; //1MB
+  const uint32_t BLOCK_SIZE = 1024; //1KB
+  const uint32_t PARTITION_ID = 3;
+  pool->init(POOL_SIZE);
+  ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
+  PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
+
+  const uint32_t KV_SIZE = 700;
+  const uint32_t SMALL_KV_SIZE = 100;
+  KVBuffer * kv1 = bucket->allocateKVBuffer(KV_SIZE);
+  KVBuffer * kv2 = bucket->allocateKVBuffer(SMALL_KV_SIZE);
+  KVBuffer * kv3 = bucket->allocateKVBuffer(KV_SIZE);
+
+  ASSERT_EQ(3, bucket->getKVCount());
+  KVIterator * NULLPOINTER = 0;
+  ASSERT_NE(NULLPOINTER, bucket->getIterator());
+  ASSERT_EQ(2, bucket->getMemoryBlockCount());
+
+  bucket->reset();
+  ASSERT_EQ(NULLPOINTER, bucket->getIterator());
+  ASSERT_EQ(0, bucket->getMemoryBlockCount());
+
+  delete bucket;
+  delete pool;
+}
+
+TEST(PartitionBucket, sort) {
+  MemoryPool * pool = new MemoryPool();
+  const uint32_t POOL_SIZE = 1024 * 1024; //1MB
+  const uint32_t BLOCK_SIZE = 1024; //1KB
+  const uint32_t PARTITION_ID = 3;
+  pool->init(POOL_SIZE);
+  ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
+  PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
+
+  const uint32_t KV_SIZE = 700;
+  const uint32_t SMALL_KV_SIZE = 100;
+  KVBuffer * kv1 = bucket->allocateKVBuffer(KV_SIZE);
+  KVBuffer * kv2 = bucket->allocateKVBuffer(SMALL_KV_SIZE);
+  KVBuffer * kv3 = bucket->allocateKVBuffer(KV_SIZE);
+
+  const uint32_t SMALL = 10;
+  const uint32_t MEDIUM = 100;
+  const uint32_t BIG = 1000;
+
+  kv1->keyLength = 4;
+  *((uint32_t *)kv1->getKey()) = bswap(BIG);
+  kv1->valueLength = KV_SIZE - kv1->headerLength() - kv1->keyLength;
+
+  kv2->keyLength = 4;
+  *((uint32_t *)kv2->getKey()) = bswap(SMALL);
+  kv2->valueLength = KV_SIZE - kv2->headerLength() - kv2->keyLength;
+
+  kv3->keyLength = 4;
+  *((uint32_t *)kv3->getKey()) = bswap(MEDIUM);
+  kv3->valueLength = KV_SIZE - kv3->headerLength() - kv3->keyLength;
+
+  bucket->sort(DUALPIVOTSORT);
+
+  KVIterator * iter = bucket->getIterator();
+
+  Buffer key;
+  Buffer value;
+  iter->next(key, value);
+
+  ASSERT_EQ(SMALL, bswap(*(uint32_t * )key.data()));
+
+  iter->next(key, value);
+  ASSERT_EQ(MEDIUM, bswap(*(uint32_t * )key.data()));
+
+  iter->next(key, value);
+  ASSERT_EQ(BIG, bswap(*(uint32_t * )key.data()));
+
+  delete iter;
+  delete bucket;
+  delete pool;
+}
+
+TEST(PartitionBucket, spill) {
+  MemoryPool * pool = new MemoryPool();
+  const uint32_t POOL_SIZE = 1024 * 1024; //1MB
+  const uint32_t BLOCK_SIZE = 1024; //1KB
+  const uint32_t PARTITION_ID = 3;
+  pool->init(POOL_SIZE);
+  ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
+  PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
+
+  const uint32_t KV_SIZE = 700;
+  const uint32_t SMALL_KV_SIZE = 100;
+  KVBuffer * kv1 = bucket->allocateKVBuffer(KV_SIZE);
+  KVBuffer * kv2 = bucket->allocateKVBuffer(SMALL_KV_SIZE);
+  KVBuffer * kv3 = bucket->allocateKVBuffer(KV_SIZE);
+
+  const uint32_t SMALL = 10;
+  const uint32_t MEDIUM = 100;
+  const uint32_t BIG = 1000;
+
+  kv1->keyLength = 4;
+  *((uint32_t *)kv1->getKey()) = bswap(BIG);
+  kv1->valueLength = KV_SIZE - KVBuffer::headerLength() - kv1->keyLength;
+
+  kv2->keyLength = 4;
+  *((uint32_t *)kv2->getKey()) = bswap(SMALL);
+  kv2->valueLength = KV_SIZE - KVBuffer::headerLength() - kv2->keyLength;
+
+  kv3->keyLength = 4;
+  *((uint32_t *)kv3->getKey()) = bswap(MEDIUM);
+  kv3->valueLength = KV_SIZE - KVBuffer::headerLength() - kv3->keyLength;
+
+  bucket->sort(DUALPIVOTSORT);
+
+  uint32_t BUFF_SIZE = 1024 * 1024;
+  char * buff = new char[BUFF_SIZE];
+  MockIFileWriter writer(buff, BUFF_SIZE);
+  bucket->spill(&writer);
+
+  //check the result
+  KVBuffer * first = (KVBuffer *)writer.buff();
+  ASSERT_EQ(4, first->keyLength);
+  ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, first->valueLength);
+  ASSERT_EQ(bswap(SMALL), (*(uint32_t * )(first->getKey())));
+
+  KVBuffer * second = first->next();
+  ASSERT_EQ(4, second->keyLength);
+  ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, second->valueLength);
+  ASSERT_EQ(bswap(MEDIUM), (*(uint32_t * )(second->getKey())));
+
+  KVBuffer * third = second->next();
+  ASSERT_EQ(4, third->keyLength);
+  ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, third->valueLength);
+  ASSERT_EQ(bswap(BIG), (*(uint32_t * )(third->getKey())));
+
+  delete bucket;
+  delete pool;
+}
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "BufferStream.h"
+#include "Buffers.h"
+#include "test_commons.h"
+
+TEST(Buffers, AppendRead) {
+  string codec = "";
+  vector<string> data;
+  Generate(data, 1000000, "word");
+  string dest;
+  dest.reserve(64 * 1024 * 1024);
+  OutputStringStream outputStream = OutputStringStream(dest);
+  AppendBuffer appendBuffer;
+  appendBuffer.init(64 * 1024, &outputStream, codec);
+  for (size_t i = 0; i < data.size(); i++) {
+    appendBuffer.write(data[i].c_str(), data[i].length());
+  }
+  appendBuffer.flush();
+  InputBuffer inputBuffer = InputBuffer(dest.c_str(), dest.length());
+  ReadBuffer readBuffer = ReadBuffer();
+  readBuffer.init(64 * 1024, &inputBuffer, codec);
+  for (size_t i = 0; i < data.size(); i++) {
+    const char * rd = readBuffer.get(data[i].length());
+    ASSERT_EQ(data[i], string(rd, data[i].length()));
+  }
+}
+
+TEST(Buffers, AppendReadSnappy) {
+  string codec = "org.apache.hadoop.io.compress.SnappyCodec";
+  vector<string> data;
+  Generate(data, 1000000, "word");
+  string dest;
+  dest.reserve(64 * 1024 * 1024);
+  OutputStringStream outputStream = OutputStringStream(dest);
+  AppendBuffer appendBuffer;
+  appendBuffer.init(64 * 1024, &outputStream, codec);
+  for (size_t i = 0; i < data.size(); i++) {
+    appendBuffer.write(data[i].c_str(), data[i].length());
+  }
+  appendBuffer.flush();
+  InputBuffer inputBuffer = InputBuffer(dest.c_str(), dest.length());
+  ReadBuffer readBuffer = ReadBuffer();
+  readBuffer.init(64 * 1024, &inputBuffer, codec);
+  for (size_t i = 0; i < data.size(); i++) {
+    const char * rd = readBuffer.get(data[i].length());
+    ASSERT_EQ(data[i], string(rd, data[i].length()));
+  }
+}
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadWriteBuffer.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadWriteBuffer.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadWriteBuffer.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadWriteBuffer.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(ReadWriteBuffer, readAndWrite) {
+  ReadWriteBuffer buff(16);
+
+  int INT = 100;
+  int LONG = 200;
+  std::string STR = "hello, readWriteBuffer";
+  void * POINTER = this;
+
+  int REPEAT = 10;
+
+  for (int i = 0; i < REPEAT; i++) {
+    buff.writeInt(INT);
+    buff.writeLong(LONG);
+    buff.writeString(&STR);
+    buff.writePointer(POINTER);
+    buff.writeString(STR.c_str(), STR.length());
+  }
+
+  uint32_t writePoint = buff.getWritePoint();
+  LOG("Current Write Point: %d", writePoint);
+
+  for (int i = 0; i < REPEAT; i++) {
+    ASSERT_EQ(INT, buff.readInt());
+    ASSERT_EQ(LONG, buff.readLong());
+    string * read = buff.readString();
+    LOG("READ STRING: %s", read->c_str());
+    ASSERT_EQ(0, STR.compare(read->c_str()));
+    delete read;
+
+    ASSERT_EQ(POINTER, buff.readPointer());
+
+    read = buff.readString();
+    LOG("READ STRING: %s", read->c_str());
+    ASSERT_EQ(0, STR.compare(read->c_str()));
+  }
+
+  uint32_t readPoint = buff.getReadPoint();
+  ASSERT_EQ(writePoint, readPoint);
+
+  buff.setWritePoint(0);
+  buff.setReadPoint(0);
+
+  ASSERT_EQ(0, buff.getReadPoint());
+  ASSERT_EQ(0, buff.getWritePoint());
+}
+
+} /* namespace NativeTask */

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestTrackingCollector.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestTrackingCollector.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestTrackingCollector.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestTrackingCollector.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(TrackingCollector, read) {
+  const std::string GROUP("group");
+  const std::string KEY("key");
+  Counter * counter = new Counter(GROUP, KEY);
+  Collector * collector = new Collector();
+  TrackingCollector tracking(collector, counter);
+  tracking.collect(NULL, 0, NULL, 0);
+  ASSERT_EQ(1, counter->get());
+  delete counter;
+  delete collector;
+}
+} /* namespace NativeTask */



Mime
View raw message