hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [11/64] [abbrv] Import initial code for MAPREDUCE-2841 (native output collector)
Date Sat, 13 Sep 2014 01:41:16 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc
new file mode 100644
index 0000000..a8d7e1b
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/Timer.h"
+#include "util/StringUtil.h"
+#include "Merge.h"
+#include "lib/FileSystem.h"
+
+namespace NativeTask {
+
+IFileMergeEntry * IFileMergeEntry::create(SingleSpillInfo * spill) {
+  InputStream * fileOut = FileSystem::getLocal().open(spill->path);
+  IFileReader * reader = new IFileReader(fileOut, spill, true);
+  return new IFileMergeEntry(reader);
+}
+
+Merger::Merger(IFileWriter * writer, Config * config, ComparatorPtr comparator,
+    ICombineRunner * combineRunner)
+    : _writer(writer), _config(config), _combineRunner(combineRunner), _first(true),
+        _comparator(comparator) {
+
+}
+
+Merger::~Merger() {
+  _heap.clear();
+  for (size_t i = 0; i < _entries.size(); i++) {
+    delete _entries[i];
+  }
+  _entries.clear();
+}
+
+void Merger::addMergeEntry(MergeEntryPtr pme) {
+  _entries.push_back(pme);
+}
+
+/**
+ * 0 if success, have next partition
+ * 1 if failed, no more
+ */
+bool Merger::startPartition() {
+  bool firstPartitionState = false;
+  for (size_t i = 0; i < _entries.size(); i++) {
+    bool partitionState = _entries[i]->nextPartition();
+    if (i == 0) {
+      firstPartitionState = partitionState;
+    }
+    if (firstPartitionState != partitionState) {
+      THROW_EXCEPTION(IOException, "MergeEntry partition number not equal");
+    }
+  }
+  if (firstPartitionState) { // do have new partition
+    _writer->startPartition();
+  }
+  return firstPartitionState;
+}
+
+/**
+ * finish one partition
+ */
+void Merger::endPartition() {
+  _writer->endPartition();
+}
+
+void Merger::initHeap() {
+  _heap.clear();
+  for (size_t i = 0; i < _entries.size(); i++) {
+    MergeEntryPtr pme = _entries[i];
+    if (pme->next()) {
+      _heap.push_back(pme);
+    }
+  }
+  makeHeap(&(_heap[0]), &(_heap[0]) + _heap.size(), _comparator);
+}
+
+bool Merger::next() {
+  size_t cur_heap_size = _heap.size();
+  if (cur_heap_size > 0) {
+    if (!_first) {
+      if (_heap[0]->next()) { // have more, adjust heap
+        if (cur_heap_size == 1) {
+          return true;
+        } else if (cur_heap_size == 2) {
+          MergeEntryPtr * base = &(_heap[0]);
+
+          if (_comparator(base[1], base[0])) {
+            std::swap(base[0], base[1]);
+          }
+        } else {
+          MergeEntryPtr * base = &(_heap[0]);
+          heapify(base, 1, cur_heap_size, _comparator);
+        }
+      } else { // no more, pop heap
+        MergeEntryPtr * base = &(_heap[0]);
+        popHeap(base, base + cur_heap_size, _comparator);
+        _heap.pop_back();
+      }
+    } else {
+      _first = false;
+    }
+    return _heap.size() > 0;
+  }
+  return false;
+}
+
+bool Merger::next(Buffer & key, Buffer & value) {
+  bool result = next();
+  if (result) {
+    MergeEntryPtr * base = &(_heap[0]);
+    key.reset(base[0]->getKey(), base[0]->getKeyLength());
+    value.reset(base[0]->getValue(), base[0]->getValueLength());
+    return true;
+  } else {
+    return false;
+  }
+}
+
+void Merger::merge() {
+  Timer timer;
+  uint64_t total_record = 0;
+  _heap.reserve(_entries.size());
+  MergeEntryPtr * base = &(_heap[0]);
+  while (startPartition()) {
+    initHeap();
+    if (_heap.size() == 0) {
+      endPartition();
+      continue;
+    }
+    _first = true;
+    if (_combineRunner == NULL) {
+      while (next()) {
+        _writer->write(base[0]->getKey(), base[0]->getKeyLength(), base[0]->getValue(),
+            base[0]->getValueLength());
+        total_record++;
+      }
+    } else {
+      _combineRunner->combine(CombineContext(UNKNOWN), this, _writer);
+    }
+    endPartition();
+  }
+
+  uint64_t interval = (timer.now() - timer.last());
+  uint64_t M = 1000000; //1 million
+
+  uint64_t output_size;
+  uint64_t real_output_size;
+  _writer->getStatistics(output_size, real_output_size);
+
+  if (total_record != 0) {
+    LOG("[Merge] Merged segment#: %lu, record#: %llu, avg record size: %llu, uncompressed total bytes: %llu, compressed total bytes: %llu, time: %llu ms",
+        _entries.size(),
+        total_record,
+        output_size / (total_record),
+        output_size,
+        real_output_size,
+        interval / M);
+  } else {
+    LOG("[Merge] Merged segments#, %lu, uncompressed total bytes: %llu, compressed total bytes: %llu, time: %llu ms",
+        _entries.size(),
+        output_size,
+        real_output_size,
+        interval / M);
+  }
+}
+
+} // namespace NativeTask

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h
new file mode 100644
index 0000000..956fa41
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MERGE_H_
+#define MERGE_H_
+
+#include "NativeTask.h"
+#include "Buffers.h"
+#include "MapOutputCollector.h"
+#include "IFile.h"
+#include "MinHeap.h"
+
+namespace NativeTask {
+
+/**
+ * merger
+ */
+class MergeEntry {
+
+protected:
+  // these 3 fields should be filled after next() is called
+  const char * _key;
+  const char * _value;
+  uint32_t _keyLength;
+  uint32_t _valueLength;
+
+public:
+  MergeEntry()
+      : _key(NULL), _value(NULL), _keyLength(0), _valueLength(0) {
+  }
+
+  const char * getKey() const {
+    return _key;
+  }
+
+  const char * getValue() const {
+    return _value;
+  }
+
+  uint32_t getKeyLength() const {
+    return _keyLength;
+  }
+
+  uint32_t getValueLength() const {
+    return _valueLength;
+  }
+
+  virtual ~MergeEntry() {
+  }
+
+  /**
+   * move to next partition
+   * 0 on success
+   * 1 on no more
+   */
+  virtual bool nextPartition() = 0;
+
+  /**
+   * move to next key/value
+   * 0 on success
+   * 1 on no more
+   */
+  virtual bool next() = 0;
+};
+
+/**
+ * Merger
+ */
+typedef MergeEntry * MergeEntryPtr;
+
+class MergeEntryComparator {
+private:
+  ComparatorPtr _keyComparator;
+
+public:
+  MergeEntryComparator(ComparatorPtr comparator)
+      : _keyComparator(comparator) {
+  }
+
+public:
+  bool operator()(const MergeEntryPtr lhs, const MergeEntryPtr rhs) {
+    return (*_keyComparator)(lhs->getKey(), lhs->getKeyLength(), rhs->getKey(), rhs->getKeyLength())
+        < 0;
+  }
+};
+
+/**
+ * Merge entry for in-memory partition bucket
+ */
+class MemoryMergeEntry : public MergeEntry {
+protected:
+
+  PartitionBucket ** _partitions;
+  uint32_t _number;
+  int64_t _index;
+
+  KVIterator * _iterator;
+  Buffer keyBuffer;
+  Buffer valueBuffer;
+
+public:
+  MemoryMergeEntry(PartitionBucket ** partitions, uint32_t numberOfPartitions)
+      : _partitions(partitions), _number(numberOfPartitions), _index(-1), _iterator(NULL) {
+  }
+
+  virtual ~MemoryMergeEntry() {
+    if (NULL != _iterator) {
+      delete _iterator;
+      _iterator = NULL;
+    }
+  }
+
+  virtual bool nextPartition() {
+    ++_index;
+    if (_index < _number) {
+      PartitionBucket * current = _partitions[_index];
+      if (NULL != _iterator) {
+        delete _iterator;
+        _iterator = NULL;
+      }
+      if (NULL != current) {
+        _iterator = current->getIterator();
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * move to next key/value
+   * 0 on success
+   * 1 on no more
+   */
+  virtual bool next() {
+    if (NULL == _iterator) {
+      return false;
+    }
+    bool hasNext = _iterator->next(keyBuffer, valueBuffer);
+
+    if (hasNext) {
+      _keyLength = keyBuffer.length();
+      _key = keyBuffer.data();
+      _valueLength = valueBuffer.length();
+      _value = valueBuffer.data();
+      assert(_value != NULL);
+      return true;
+    }
+    // detect error early
+    _keyLength = 0xffffffff;
+    _valueLength = 0xffffffff;
+    _key = NULL;
+    _value = NULL;
+    return false;
+  }
+};
+
+/**
+ * Merge entry for intermediate file
+ */
+class IFileMergeEntry : public MergeEntry {
+protected:
+  IFileReader * _reader;
+  bool new_partition;
+public:
+  /**
+   * @param reader: managed by InterFileMergeEntry
+   */
+
+  static IFileMergeEntry * create(SingleSpillInfo * spill);
+
+  IFileMergeEntry(IFileReader * reader)
+      : _reader(reader) {
+    new_partition = false;
+  }
+
+  virtual ~IFileMergeEntry() {
+    delete _reader;
+    _reader = NULL;
+  }
+
+  /**
+   * move to next partition
+   * 0 on success
+   * 1 on no more
+   */
+  virtual bool nextPartition() {
+    return _reader->nextPartition();
+  }
+
+  /**
+   * move to next key/value
+   * 0 on success
+   * 1 on no more
+   */
+  virtual bool next() {
+    _key = _reader->nextKey(_keyLength);
+    if (unlikely(NULL == _key)) {
+      // detect error early
+      _keyLength = 0xffffffffU;
+      _valueLength = 0xffffffffU;
+      return false;
+    }
+    _value = _reader->value(_valueLength);
+    return true;
+  }
+};
+
+class Merger : public KVIterator {
+
+private:
+  vector<MergeEntryPtr> _entries;
+  vector<MergeEntryPtr> _heap;
+  IFileWriter * _writer;
+  Config * _config;
+  ICombineRunner * _combineRunner;
+  bool _first;
+  MergeEntryComparator _comparator;
+
+public:
+  Merger(IFileWriter * writer, Config * config, ComparatorPtr comparator,
+      ICombineRunner * combineRunner = NULL);
+
+  ~Merger();
+
+  void addMergeEntry(MergeEntryPtr pme);
+
+  void merge();
+
+  virtual bool next(Buffer & key, Buffer & value);
+protected:
+  bool startPartition();
+  void endPartition();
+  void initHeap();
+  bool next();
+};
+
+} // namespace NativeTask
+
+#endif /* MERGE_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h
new file mode 100644
index 0000000..ab434f6
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MIN_HEAP_H_
+#define MIN_HEAP_H_
+
+#include "NativeTask.h"
+#include "Buffers.h"
+
+template<typename T, typename Compare>
+void heapify(T* first, int rt, int heap_len, Compare & Comp) {
+  while (rt * 2 <= heap_len) // not leaf
+  {
+    int left = (rt << 1); // left child
+    int right = (rt << 1) + 1; // right child
+    int smallest = rt;
+    if (Comp(*(first + left - 1), *(first + smallest - 1))) {
+      smallest = left;
+    }
+    if (right <= heap_len && Comp(*(first + right - 1), *(first + smallest - 1))) {
+      smallest = right;
+    }
+    if (smallest != rt) {
+      std::swap(*(first + smallest - 1), *(first + rt - 1));
+      rt = smallest;
+    } else {
+      break;
+    }
+  }
+}
+
+template<typename T, typename Compare>
+void makeHeap(T* begin, T* end, Compare & Comp) {
+  int heap_len = end - begin;
+  if (heap_len >= 0) {
+    for (uint32_t i = heap_len / 2; i >= 1; i--) {
+      heapify(begin, i, heap_len, Comp);
+    }
+  }
+}
+
+template<typename T, typename Compare>
+void popHeap(T* begin, T* end, Compare & Comp) {
+  *begin = *(end - 1);
+  // adjust [begin, end - 1) to heap
+  heapify(begin, 1, end - begin - 1, Comp);
+}
+
+#endif /* HEAP_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc
new file mode 100644
index 0000000..237b55e
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <dlfcn.h>
+
+#include "commons.h"
+#include "NativeObjectFactory.h"
+#include "NativeLibrary.h"
+
+namespace NativeTask {
+
+//////////////////////////////////////////////////////////////////
+// NativeLibrary methods
+//////////////////////////////////////////////////////////////////
+
+NativeLibrary::NativeLibrary(const string & path, const string & name)
+    : _path(path), _name(name), _getObjectCreatorFunc(NULL), _functionGetter(NULL) {
+
+}
+
+bool NativeLibrary::init() {
+  void *library = dlopen(_path.c_str(), RTLD_LAZY | RTLD_GLOBAL);
+  if (NULL == library) {
+    LOG("[NativeLibrary] Load object library %s failed.", _path.c_str());
+    return false;
+  }
+  // clean error status
+  dlerror();
+
+  string create_object_func_name = _name + "GetObjectCreator";
+  _getObjectCreatorFunc = (GetObjectCreatorFunc)dlsym(library, create_object_func_name.c_str());
+  if (NULL == _getObjectCreatorFunc) {
+    LOG("[NativeLibrary] ObjectCreator function [%s] not found", create_object_func_name.c_str());
+  }
+
+  string functionGetter = _name + "GetFunctionGetter";
+  _functionGetter = (FunctionGetter)dlsym(library, functionGetter.c_str());
+  if (NULL == _functionGetter) {
+    LOG("[NativeLibrary] function getter [%s] not found", functionGetter.c_str());
+  }
+
+  string init_library_func_name = _name + "Init";
+  InitLibraryFunc init_library_func = (InitLibraryFunc)dlsym(library,
+      init_library_func_name.c_str());
+  if (NULL == init_library_func) {
+    LOG("[NativeLibrary] Library init function [%s] not found", init_library_func_name.c_str());
+  } else {
+    init_library_func();
+  }
+  return true;
+}
+
+NativeObject * NativeLibrary::createObject(const string & clz) {
+  if (NULL == _getObjectCreatorFunc) {
+    return NULL;
+  }
+  return (NativeObject*)((_getObjectCreatorFunc(clz))());
+}
+
+void * NativeLibrary::getFunction(const string & functionName) {
+  if (NULL == _functionGetter) {
+    return NULL;
+  }
+  return (*_functionGetter)(functionName);
+}
+
+ObjectCreatorFunc NativeLibrary::getObjectCreator(const string & clz) {
+  if (NULL == _getObjectCreatorFunc) {
+    return NULL;
+  }
+  return _getObjectCreatorFunc(clz);
+}
+
+} // namespace NativeTask
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h
new file mode 100644
index 0000000..4eda215
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NATIVELIBRARY_H_
+#define NATIVELIBRARY_H_
+
+#include <string>
+
+namespace NativeTask {
+
+using std::string;
+class NativeObject;
+class NativeObjectFactory;
+
+/**
+ * User level object library abstraction
+ */
+class NativeLibrary {
+  friend class NativeObjectFactory;
+private:
+  string _path;
+  string _name;
+  GetObjectCreatorFunc _getObjectCreatorFunc;
+  FunctionGetter _functionGetter;
+public:
+  NativeLibrary(const string & path, const string & name);
+
+  bool init();
+
+  NativeObject * createObject(const string & clz);
+
+  void * getFunction(const string & functionName);
+
+  ObjectCreatorFunc getObjectCreator(const string & clz);
+
+  ~NativeLibrary() {
+  }
+};
+
+} // namespace NativeTask
+
+#endif /* NATIVELIBRARY_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc
new file mode 100644
index 0000000..bcd00f4
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <signal.h>
+#ifndef __CYGWIN__
+#include <execinfo.h>
+#endif
+#include "commons.h"
+#include "NativeTask.h"
+#include "NativeObjectFactory.h"
+#include "NativeLibrary.h"
+#include "BufferStream.h"
+#include "util/StringUtil.h"
+#include "util/SyncUtils.h"
+#include "util/WritableUtils.h"
+#include "handler/BatchHandler.h"
+#include "handler/MCollectorOutputHandler.h"
+#include "handler/CombineHandler.h"
+
+using namespace NativeTask;
+
+// TODO: just for debug, should be removed
+extern "C" void handler(int sig) {
+  void *array[10];
+  size_t size;
+
+  // print out all the frames to stderr
+  fprintf(stderr, "Error: signal %d:\n", sig);
+
+#ifndef __CYGWIN__
+  // get void*'s for all entries on the stack
+  size = backtrace(array, 10);
+
+  backtrace_symbols_fd(array, size, 2);
+#endif
+
+  exit(1);
+}
+
+DEFINE_NATIVE_LIBRARY(NativeTask) {
+  //signal(SIGSEGV, handler);
+  REGISTER_CLASS(BatchHandler, NativeTask);
+  REGISTER_CLASS(CombineHandler, NativeTask);
+  REGISTER_CLASS(MCollectorOutputHandler, NativeTask);
+  REGISTER_CLASS(Mapper, NativeTask);
+  REGISTER_CLASS(Reducer, NativeTask);
+  REGISTER_CLASS(Partitioner, NativeTask);
+  REGISTER_CLASS(Folder, NativeTask);
+  NativeObjectFactory::SetDefaultClass(BatchHandlerType, "NativeTask.BatchHandler");
+  NativeObjectFactory::SetDefaultClass(MapperType, "NativeTask.Mapper");
+  NativeObjectFactory::SetDefaultClass(ReducerType, "NativeTask.Reducer");
+  NativeObjectFactory::SetDefaultClass(PartitionerType, "NativeTask.Partitioner");
+  NativeObjectFactory::SetDefaultClass(FolderType, "NativeTask.Folder");
+}
+
+namespace NativeTask {
+
+static Config G_CONFIG;
+
+vector<NativeLibrary *> NativeObjectFactory::Libraries;
+map<NativeObjectType, string> NativeObjectFactory::DefaultClasses;
+Config * NativeObjectFactory::GlobalConfig = &G_CONFIG;
+float NativeObjectFactory::LastProgress = 0;
+Progress * NativeObjectFactory::TaskProgress = NULL;
+string NativeObjectFactory::LastStatus;
+set<Counter *> NativeObjectFactory::CounterSet;
+vector<Counter *> NativeObjectFactory::Counters;
+vector<uint64_t> NativeObjectFactory::CounterLastUpdateValues;
+bool NativeObjectFactory::Inited = false;
+
+static Lock FactoryLock;
+
+bool NativeObjectFactory::Init() {
+  ScopeLock<Lock> autolocak(FactoryLock);
+  if (Inited == false) {
+    // setup log device
+    string device = GetConfig().get(NATIVE_LOG_DEVICE, "stderr");
+    if (device == "stdout") {
+      LOG_DEVICE = stdout;
+    } else if (device == "stderr") {
+      LOG_DEVICE = stderr;
+    } else {
+      LOG_DEVICE = fopen(device.c_str(), "w");
+    }
+    NativeTaskInit();
+    NativeLibrary * library = new NativeLibrary("libnativetask.so", "NativeTask");
+    library->_getObjectCreatorFunc = NativeTaskGetObjectCreator;
+    Libraries.push_back(library);
+    Inited = true;
+    // load extra user provided libraries
+    string libraryConf = GetConfig().get(NATIVE_CLASS_LIBRARY_BUILDIN, "");
+    if (libraryConf.length() > 0) {
+      vector<string> libraries;
+      vector<string> pair;
+      StringUtil::Split(libraryConf, ",", libraries, true);
+      for (size_t i = 0; i < libraries.size(); i++) {
+        pair.clear();
+        StringUtil::Split(libraries[i], "=", pair, true);
+        if (pair.size() == 2) {
+          string & name = pair[0];
+          string & path = pair[1];
+          LOG("[NativeObjectLibrary] Try to load library [%s] with file [%s]", name.c_str(),
+              path.c_str());
+          if (false == RegisterLibrary(path, name)) {
+            LOG("[NativeObjectLibrary] RegisterLibrary failed: name=%s path=%s", name.c_str(),
+                path.c_str());
+            return false;
+          } else {
+            LOG("[NativeObjectLibrary] RegisterLibrary success: name=%s path=%s", name.c_str(),
+                path.c_str());
+          }
+        } else {
+          LOG("[NativeObjectLibrary] Illegal native.class.libray: [%s] in [%s]",
+              libraries[i].c_str(), libraryConf.c_str());
+        }
+      }
+    }
+    const char * version = GetConfig().get(NATIVE_HADOOP_VERSION);
+    LOG("[NativeObjectLibrary] NativeTask library initialized with hadoop %s",
+        version==NULL?"unkown":version);
+  }
+  return true;
+}
+
+void NativeObjectFactory::Release() {
+  ScopeLock<Lock> autolocak(FactoryLock);
+  for (ssize_t i = Libraries.size() - 1; i >= 0; i--) {
+    delete Libraries[i];
+    Libraries[i] = NULL;
+  }
+  Libraries.clear();
+  for (size_t i = 0; i < Counters.size(); i++) {
+    delete Counters[i];
+  }
+  Counters.clear();
+  if (LOG_DEVICE != stdout && LOG_DEVICE != stderr) {
+    fclose(LOG_DEVICE);
+    LOG_DEVICE = stderr;
+  }
+  Inited = false;
+}
+
+void NativeObjectFactory::CheckInit() {
+  if (Inited == false) {
+    if (!Init()) {
+      throw new IOException("Init NativeTask library failed.");
+    }
+  }
+}
+
+Config & NativeObjectFactory::GetConfig() {
+  return *GlobalConfig;
+}
+
+Config * NativeObjectFactory::GetConfigPtr() {
+  return GlobalConfig;
+}
+
+void NativeObjectFactory::SetTaskProgressSource(Progress * progress) {
+  TaskProgress = progress;
+}
+
+float NativeObjectFactory::GetTaskProgress() {
+  if (TaskProgress != NULL) {
+    LastProgress = TaskProgress->getProgress();
+  }
+  return LastProgress;
+}
+
+void NativeObjectFactory::SetTaskStatus(const string & status) {
+  LastStatus = status;
+}
+
+static Lock CountersLock;
+
+void NativeObjectFactory::GetTaskStatusUpdate(string & statusData) {
+  // Encoding:
+  // progress:float
+  // status:Text
+  // Counter number
+  // Counters[group:Text, name:Text, incrCount:Long]
+  OutputStringStream os(statusData);
+  float progress = GetTaskProgress();
+  WritableUtils::WriteFloat(&os, progress);
+  WritableUtils::WriteText(&os, LastStatus);
+  LastStatus.clear();
+  {
+    ScopeLock<Lock> AutoLock(CountersLock);
+    uint32_t numCounter = (uint32_t)Counters.size();
+    WritableUtils::WriteInt(&os, numCounter);
+    for (size_t i = 0; i < numCounter; i++) {
+      Counter * counter = Counters[i];
+      uint64_t newCount = counter->get();
+      uint64_t incr = newCount - CounterLastUpdateValues[i];
+      CounterLastUpdateValues[i] = newCount;
+      WritableUtils::WriteText(&os, counter->group());
+      WritableUtils::WriteText(&os, counter->name());
+      WritableUtils::WriteLong(&os, incr);
+    }
+  }
+}
+
+Counter * NativeObjectFactory::GetCounter(const string & group, const string & name) {
+  ScopeLock<Lock> AutoLock(CountersLock);
+  Counter tmpCounter(group, name);
+  set<Counter *>::iterator itr = CounterSet.find(&tmpCounter);
+  if (itr != CounterSet.end()) {
+    return *itr;
+  }
+  Counter * ret = new Counter(group, name);
+  Counters.push_back(ret);
+  CounterLastUpdateValues.push_back(0);
+  CounterSet.insert(ret);
+  return ret;
+}
+
+void NativeObjectFactory::RegisterClass(const string & clz, ObjectCreatorFunc func) {
+  NativeTaskClassMap__[clz] = func;
+}
+
+NativeObject * NativeObjectFactory::CreateObject(const string & clz) {
+  ObjectCreatorFunc creator = GetObjectCreator(clz);
+  return creator ? creator() : NULL;
+}
+
+void * NativeObjectFactory::GetFunction(const string & funcName) {
+  CheckInit();
+  {
+    for (vector<NativeLibrary*>::reverse_iterator ritr = Libraries.rbegin();
+        ritr != Libraries.rend(); ritr++) {
+      void * ret = (*ritr)->getFunction(funcName);
+      if (NULL != ret) {
+        return ret;
+      }
+    }
+    return NULL;
+  }
+}
+
+ObjectCreatorFunc NativeObjectFactory::GetObjectCreator(const string & clz) {
+  CheckInit();
+  {
+    for (vector<NativeLibrary*>::reverse_iterator ritr = Libraries.rbegin();
+        ritr != Libraries.rend(); ritr++) {
+      ObjectCreatorFunc ret = (*ritr)->getObjectCreator(clz);
+      if (NULL != ret) {
+        return ret;
+      }
+    }
+    return NULL;
+  }
+}
+
+void NativeObjectFactory::ReleaseObject(NativeObject * obj) {
+  delete obj;
+}
+
+bool NativeObjectFactory::RegisterLibrary(const string & path, const string & name) {
+  CheckInit();
+  {
+    NativeLibrary * library = new NativeLibrary(path, name);
+    bool ret = library->init();
+    if (!ret) {
+      delete library;
+      return false;
+    }
+    Libraries.push_back(library);
+    return true;
+  }
+}
+
+static Lock DefaultClassesLock;
+
+void NativeObjectFactory::SetDefaultClass(NativeObjectType type, const string & clz) {
+  ScopeLock<Lock> autolocak(DefaultClassesLock);
+  DefaultClasses[type] = clz;
+}
+
+NativeObject * NativeObjectFactory::CreateDefaultObject(NativeObjectType type) {
+  CheckInit();
+  {
+    if (DefaultClasses.find(type) != DefaultClasses.end()) {
+      string clz = DefaultClasses[type];
+      return CreateObject(clz);
+    }
+    LOG("[NativeObjectLibrary] Default class for NativeObjectType %s not found",
+        NativeObjectTypeToString(type).c_str());
+    return NULL;
+  }
+}
+
+int NativeObjectFactory::BytesComparator(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+
+  uint32_t minlen = std::min(srcLength, destLength);
+  int64_t ret = fmemcmp(src, dest, minlen);
+  if (ret > 0) {
+    return 1;
+  } else if (ret < 0) {
+    return -1;
+  }
+  return srcLength - destLength;
+}
+
+int NativeObjectFactory::ByteComparator(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  return (*src) - (*dest);
+}
+
+int NativeObjectFactory::IntComparator(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  int result = (*src) - (*dest);
+  if (result == 0) {
+    uint32_t from = bswap(*(uint32_t*)src);
+    uint32_t to = bswap(*(uint32_t*)dest);
+    if (from > to) {
+      return 1;
+    } else if (from == to) {
+      return 0;
+    } else {
+      return -1;
+    }
+  }
+  return result;
+}
+
+int NativeObjectFactory::LongComparator(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  int result = (int)(*src) - (int)(*dest);
+  if (result == 0) {
+
+    uint64_t from = bswap64(*(uint64_t*)src);
+    uint64_t to = bswap64(*(uint64_t*)dest);
+    if (from > to) {
+      return 1;
+    } else if (from == to) {
+      return 0;
+    } else {
+      return -1;
+    }
+  }
+  return result;
+}
+
+int NativeObjectFactory::VIntComparator(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  int32_t from = WritableUtils::ReadVInt(src, srcLength);
+  int32_t to = WritableUtils::ReadVInt(dest, destLength);
+  if (from > to) {
+    return 1;
+  } else if (from == to) {
+    return 0;
+  } else {
+    return -1;
+  }
+}
+
+int NativeObjectFactory::VLongComparator(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  int64_t from = WritableUtils::ReadVLong(src, srcLength);
+  int64_t to = WritableUtils::ReadVLong(dest, destLength);
+  if (from > to) {
+    return 1;
+  } else if (from == to) {
+    return 0;
+  } else {
+    return -1;
+  }
+}
+
+int NativeObjectFactory::FloatComparator(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  if (srcLength != 4 || destLength != 4) {
+    THROW_EXCEPTION_EX(IOException, "float comparator, while src/dest lengt is not 4");
+  }
+
+  uint32_t from = bswap(*(uint32_t*)src);
+  uint32_t to = bswap(*(uint32_t*)dest);
+
+  float * srcValue = (float *)(&from);
+  float * destValue = (float *)(&to);
+
+  if ((*srcValue) < (*destValue)) {
+    return -1;
+  } else if ((*srcValue) == (*destValue)) {
+    return 0;
+  } else {
+    return 1;
+  }
+}
+
+int NativeObjectFactory::DoubleComparator(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  if (srcLength != 8 || destLength != 8) {
+    THROW_EXCEPTION_EX(IOException, "double comparator, while src/dest lengt is not 4");
+  }
+
+  uint64_t from = bswap64(*(uint64_t*)src);
+  uint64_t to = bswap64(*(uint64_t*)dest);
+
+  double * srcValue = (double *)(&from);
+  double * destValue = (double *)(&to);
+  if ((*srcValue) < (*destValue)) {
+    return -1;
+  } else if ((*srcValue) == (*destValue)) {
+    return 0;
+  } else {
+    return 1;
+  }
+}
+
+ComparatorPtr get_comparator(const KeyValueType keyType, const char * comparatorName) {
+  if (NULL == comparatorName) {
+    if (keyType == BytesType || keyType == TextType) {
+      return &NativeObjectFactory::BytesComparator;
+    } else if (keyType == ByteType || keyType == BoolType) {
+      return &NativeObjectFactory::ByteComparator;
+    } else if (keyType == IntType) {
+      return &NativeObjectFactory::IntComparator;
+    } else if (keyType == LongType) {
+      return &NativeObjectFactory::LongComparator;
+    } else if (keyType == FloatType) {
+      return &NativeObjectFactory::FloatComparator;
+    } else if (keyType == DoubleType) {
+      return &NativeObjectFactory::DoubleComparator;
+    } else if (keyType == VIntType) {
+      return &NativeObjectFactory::VIntComparator;
+    } else if (keyType == VLongType) {
+      return &NativeObjectFactory::VLongComparator;
+    }
+  } else {
+    void * func = NativeObjectFactory::GetFunction(string(comparatorName));
+    return (ComparatorPtr)func;
+  }
+  return NULL;
+}
+} // namespace NativeTask
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h
new file mode 100644
index 0000000..afbae2c
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NATIVEOBJECTFACTORY_H_
+#define NATIVEOBJECTFACTORY_H_
+
+#include <string>
+#include <vector>
+#include <set>
+#include <map>
+
+#include "NativeTask.h"
+
+namespace NativeTask {
+
+using std::string;
+using std::vector;
+using std::map;
+using std::set;
+using std::pair;
+
+class NativeLibrary;
+
+/**
+ * Native object factory
+ */
+class NativeObjectFactory {
+private:
+  static vector<NativeLibrary *> Libraries;
+  static map<NativeObjectType, string> DefaultClasses;
+  static Config * GlobalConfig;
+  static float LastProgress;
+  static Progress * TaskProgress;
+  static string LastStatus;
+  static set<Counter *> CounterSet;
+  static vector<Counter *> Counters;
+  static vector<uint64_t> CounterLastUpdateValues;
+  static bool Inited;
+public:
+  static bool Init();
+  static void Release();
+  static void CheckInit();
+  static Config & GetConfig();
+  static Config * GetConfigPtr();
+  static void SetTaskProgressSource(Progress * progress);
+  static float GetTaskProgress();
+  static void SetTaskStatus(const string & status);
+  static void GetTaskStatusUpdate(string & statusData);
+  static Counter * GetCounter(const string & group, const string & name);
+  static void RegisterClass(const string & clz, ObjectCreatorFunc func);
+  static NativeObject * CreateObject(const string & clz);
+  static void * GetFunction(const string & clz);
+  static ObjectCreatorFunc GetObjectCreator(const string & clz);
+  static void ReleaseObject(NativeObject * obj);
+  static bool RegisterLibrary(const string & path, const string & name);
+  static void SetDefaultClass(NativeObjectType type, const string & clz);
+  static NativeObject * CreateDefaultObject(NativeObjectType type);
+  static int BytesComparator(const char * src, uint32_t srcLength, const char * dest,
+      uint32_t destLength);
+  static int ByteComparator(const char * src, uint32_t srcLength, const char * dest,
+      uint32_t destLength);
+  static int IntComparator(const char * src, uint32_t srcLength, const char * dest,
+      uint32_t destLength);
+  static int LongComparator(const char * src, uint32_t srcLength, const char * dest,
+      uint32_t destLength);
+  static int VIntComparator(const char * src, uint32_t srcLength, const char * dest,
+      uint32_t destLength);
+  static int VLongComparator(const char * src, uint32_t srcLength, const char * dest,
+      uint32_t destLength);
+  static int FloatComparator(const char * src, uint32_t srcLength, const char * dest,
+      uint32_t destLength);
+  static int DoubleComparator(const char * src, uint32_t srcLength, const char * dest,
+      uint32_t destLength);
+};
+
+} // namespace NativeTask
+
+#endif /* NATIVEOBJECTFACTORY_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc
new file mode 100644
index 0000000..3e81565
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef QUICK_BUILD
+#include "org_apache_hadoop_mapred_nativetask_NativeRuntime.h"
+#endif
+#include "commons.h"
+#include "jniutils.h"
+#include "NativeObjectFactory.h"
+
+using namespace NativeTask;
+
+///////////////////////////////////////////////////////////////
+// NativeRuntime JNI methods
+///////////////////////////////////////////////////////////////
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNIRelease
+ * Signature: ()V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRelease(
+    JNIEnv * jenv, jclass nativeRuntimeClass) {
+  try {
+    NativeTask::NativeObjectFactory::Release();
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("[NativeRuntimeJniImpl] JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "[NativeRuntimeJniImpl] Unkown std::exception");
+  }
+}
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNIConfigure
+ * Signature: ([[B)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIConfigure(
+    JNIEnv * jenv, jclass nativeRuntimeClass, jobjectArray configs) {
+  try {
+    NativeTask::Config & config = NativeTask::NativeObjectFactory::GetConfig();
+    jsize len = jenv->GetArrayLength(configs);
+    for (jsize i = 0; i + 1 < len; i += 2) {
+      jbyteArray key_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i);
+      jbyteArray val_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i + 1);
+      config.set(JNU_ByteArrayToString(jenv, key_obj), JNU_ByteArrayToString(jenv, val_obj));
+    }
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "Unkown std::exception");
+  }
+}
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNICreateNativeObject
+ * Signature: ([B[B)J
+ */
+jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateNativeObject(
+    JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray clazz) {
+  try {
+    std::string typeString = JNU_ByteArrayToString(jenv, clazz);
+    return (jlong)(NativeTask::NativeObjectFactory::CreateObject(typeString));
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+  }
+  return 0;
+}
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNICreateDefaultNativeObject
+ * Signature: ([B)J
+ */JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateDefaultNativeObject(
+    JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray type) {
+  try {
+    std::string typeString = JNU_ByteArrayToString(jenv, type);
+    NativeTask::NativeObjectType type = NativeTask::NativeObjectTypeFromString(typeString.c_str());
+    return (jlong)(NativeTask::NativeObjectFactory::CreateDefaultObject(type));
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("[NativeRuntimeJniImpl] JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "[NativeRuntimeJniImpl] Unknown exception");
+  }
+  return 0;
+}
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNIReleaseNativeObject
+ * Signature: (J)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIReleaseNativeObject(
+    JNIEnv * jenv, jclass nativeRuntimeClass, jlong objectAddr) {
+  try {
+    NativeTask::NativeObject * nobj = ((NativeTask::NativeObject *)objectAddr);
+    if (NULL == nobj) {
+      JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException",
+          "Object addr not instance of NativeObject");
+      return;
+    }
+    NativeTask::NativeObjectFactory::ReleaseObject(nobj);
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+  }
+}
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNIRegisterModule
+ * Signature: ([B[B)I
+ */JNIEXPORT jint JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRegisterModule(
+    JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray modulePath, jbyteArray moduleName) {
+  try {
+    std::string pathString = JNU_ByteArrayToString(jenv, modulePath);
+    std::string nameString = JNU_ByteArrayToString(jenv, moduleName);
+    if (NativeTask::NativeObjectFactory::RegisterLibrary(pathString, nameString)) {
+      return 0;
+    }
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+  }
+  return 1;
+}
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNIUpdateStatus
+ * Signature: ()[B
+ */JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIUpdateStatus(
+    JNIEnv * jenv, jclass nativeRuntimeClass) {
+  try {
+    std::string statusData;
+    NativeTask::NativeObjectFactory::GetTaskStatusUpdate(statusData);
+    jbyteArray ret = jenv->NewByteArray(statusData.length());
+    jenv->SetByteArrayRegion(ret, 0, statusData.length(), (jbyte*)statusData.c_str());
+    return ret;
+  } catch (NativeTask::UnsupportException & e) {
+    JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what());
+  } catch (NativeTask::OutOfMemoryException & e) {
+    JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what());
+  } catch (NativeTask::IOException & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (NativeTask::JavaException & e) {
+    LOG("JavaException: %s", e.what());
+    // Do nothing, let java side handle
+  } catch (std::exception & e) {
+    JNU_ThrowByName(jenv, "java/io/IOException", e.what());
+  } catch (...) {
+    JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception");
+  }
+  return NULL;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc
new file mode 100644
index 0000000..5c42e56
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CYGWIN__
+#include <execinfo.h>
+#endif
+#include "commons.h"
+#include "util/Hash.h"
+#include "util/StringUtil.h"
+#include "NativeTask.h"
+#include "NativeObjectFactory.h"
+
+namespace NativeTask {
+
+//////////////////////////////////////////////////////////////////
+// NativeObjectType methods
+//////////////////////////////////////////////////////////////////
+
+const string NativeObjectTypeToString(NativeObjectType type) {
+  switch (type) {
+  case BatchHandlerType:
+    return string("BatchHandlerType");
+  case MapperType:
+    return string("MapperType");
+  case ReducerType:
+    return string("ReducerType");
+  case PartitionerType:
+    return string("PartitionerType");
+  case CombinerType:
+    return string("CombinerType");
+  case FolderType:
+    return string("FolderType");
+  case RecordReaderType:
+    return string("RecordReaderType");
+  case RecordWriterType:
+    return string("RecordWriterType");
+  default:
+    return string("UnknownObjectType");
+  }
+}
+
+NativeObjectType NativeObjectTypeFromString(const string type) {
+  if (type == "BatchHandlerType") {
+    return BatchHandlerType;
+  } else if (type == "MapperType") {
+    return MapperType;
+  } else if (type == "ReducerType") {
+    return ReducerType;
+  } else if (type == "PartitionerType") {
+    return PartitionerType;
+  } else if (type == "CombinerType") {
+    return CombinerType;
+  } else if (type == "FolderType") {
+    return CombinerType;
+  } else if (type == "RecordReaderType") {
+    return RecordReaderType;
+  } else if (type == "RecordWriterType") {
+    return RecordWriterType;
+  }
+  return UnknownObjectType;
+}
+
+HadoopException::HadoopException(const string & what) {
+  // remove long path prefix
+  size_t n = 0;
+  if (what[0] == '/') {
+    size_t p = what.find(':');
+    if (p != what.npos) {
+      while (true) {
+        size_t np = what.find('/', n + 1);
+        if (np == what.npos || np >= p) {
+          break;
+        }
+        n = np;
+      }
+    }
+  }
+  _reason.append(what.c_str() + n, what.length() - n);
+  void *array[64];
+  size_t size;
+
+#ifndef __CYGWIN__
+  size = backtrace(array, 64);
+  char ** traces = backtrace_symbols(array, size);
+  for (size_t i = 0; i< size;i++) {
+    _reason.append("\n\t");
+    _reason.append(traces[i]);
+  }
+#endif
+}
+
+///////////////////////////////////////////////////////////
+
+void Config::load(const string & path) {
+  FILE * fin = fopen(path.c_str(), "r");
+  if (NULL == fin) {
+    THROW_EXCEPTION(IOException, "file not found or can not open for read");
+  }
+  char buff[256];
+  while (fgets(buff, 256, fin) != NULL) {
+    if (buff[0] == '#') {
+      continue;
+    }
+    std::string key = buff;
+    if (key[key.length() - 1] == '\n') {
+      size_t br = key.find('=');
+      if (br != key.npos) {
+        set(key.substr(0, br), StringUtil::Trim(key.substr(br + 1)));
+      }
+    }
+  }
+  fclose(fin);
+}
+
+void Config::set(const string & key, const string & value) {
+  _configs[key] = value;
+}
+
+void Config::setInt(const string & name, int64_t value) {
+  _configs[name] = StringUtil::ToString(value);
+}
+
+void Config::setBool(const string & name, bool value) {
+  _configs[name] = StringUtil::ToString(value);
+}
+
+void Config::parse(int32_t argc, const char ** argv) {
+  for (int32_t i = 0; i < argc; i++) {
+    const char * equ = strchr(argv[i], '=');
+    if (NULL == equ) {
+      LOG("[NativeTask] config argument not recognized: %s", argv[i]);
+      continue;
+    }
+    if (argv[i][0] == '-') {
+      LOG("[NativeTask] config argument with '-' prefix ignored: %s", argv[i]);
+      continue;
+    }
+    string key(argv[i], equ - argv[i]);
+    string value(equ + 1, strlen(equ + 1));
+    map<string, string>::iterator itr = _configs.find(key);
+    if (itr == _configs.end()) {
+      _configs[key] = value;
+    } else {
+      itr->second.append(",");
+      itr->second.append(value);
+    }
+  }
+}
+
+const char * Config::get(const string & name) {
+  map<string, string>::iterator itr = _configs.find(name);
+  if (itr == _configs.end()) {
+    return NULL;
+  } else {
+    return itr->second.c_str();
+  }
+}
+
+string Config::get(const string & name, const string & defaultValue) {
+  map<string, string>::iterator itr = _configs.find(name);
+  if (itr == _configs.end()) {
+    return defaultValue;
+  } else {
+    return itr->second;
+  }
+}
+
+int64_t Config::getInt(const string & name, int64_t defaultValue) {
+  map<string, string>::iterator itr = _configs.find(name);
+  if (itr == _configs.end()) {
+    return defaultValue;
+  } else {
+    return StringUtil::toInt(itr->second);
+  }
+}
+
+bool Config::getBool(const string & name, bool defaultValue) {
+  map<string, string>::iterator itr = _configs.find(name);
+  if (itr == _configs.end()) {
+    return defaultValue;
+  } else {
+    return StringUtil::toBool(itr->second);
+  }
+}
+
+float Config::getFloat(const string & name, float defaultValue) {
+  map<string, string>::iterator itr = _configs.find(name);
+  if (itr == _configs.end()) {
+    return defaultValue;
+  } else {
+    return StringUtil::toFloat(itr->second);
+  }
+}
+
+void Config::getStrings(const string & name, vector<string> & dest) {
+  map<string, string>::iterator itr = _configs.find(name);
+  if (itr != _configs.end()) {
+    StringUtil::Split(itr->second, ",", dest, true);
+  }
+}
+
+void Config::getInts(const string & name, vector<int64_t> & dest) {
+  vector<string> sdest;
+  getStrings(name, sdest);
+  for (size_t i = 0; i < sdest.size(); i++) {
+    dest.push_back(StringUtil::toInt(sdest[i]));
+  }
+}
+
+void Config::getFloats(const string & name, vector<float> & dest) {
+  vector<string> sdest;
+  getStrings(name, sdest);
+  for (size_t i = 0; i < sdest.size(); i++) {
+    dest.push_back(StringUtil::toFloat(sdest[i]));
+  }
+}
+
+///////////////////////////////////////////////////////////
+
+Counter * ProcessorBase::getCounter(const string & group, const string & name) {
+  return NULL;
+}
+
+uint32_t Partitioner::getPartition(const char * key, uint32_t & keyLen, uint32_t numPartition) {
+  if (numPartition == 1) {
+    return 0;
+  }
+  return (Hash::BytesHash(key, keyLen) & 0x7fffffff) % numPartition;
+}
+
+///////////////////////////////////////////////////////////
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc
new file mode 100644
index 0000000..55ed4b2
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/Timer.h"
+#include "util/StringUtil.h"
+#include "NativeObjectFactory.h"
+#include "PartitionBucket.h"
+#include "Merge.h"
+#include "NativeTask.h"
+#include "WritableUtils.h"
+#include "util/DualPivotQuickSort.h"
+#include "Combiner.h"
+#include "TaskCounters.h"
+#include "MinHeap.h"
+#include "PartitionBucketIterator.h"
+
+namespace NativeTask {
+
+KVIterator * PartitionBucket::getIterator() {
+  if (_memBlocks.size() == 0) {
+    return NULL;
+  }
+  return new PartitionBucketIterator(this, _keyComparator);
+}
+
+void PartitionBucket::spill(IFileWriter * writer) throw (IOException, UnsupportException) {
+  KVIterator * iterator = getIterator();
+  if (NULL == iterator || NULL == writer) {
+    return;
+  }
+
+  if (_combineRunner == NULL) {
+    Buffer key;
+    Buffer value;
+
+    while (iterator->next(key, value)) {
+      writer->write(key.data(), key.length(), value.data(), value.length());
+    }
+  } else {
+    _combineRunner->combine(CombineContext(UNKNOWN), iterator, writer);
+  }
+  delete iterator;
+}
+
+void PartitionBucket::sort(SortAlgorithm type) {
+  if (_memBlocks.size() == 0) {
+    return;
+  }
+  if ((!_sorted)) {
+    for (uint32_t i = 0; i < _memBlocks.size(); i++) {
+      MemoryBlock * block = _memBlocks[i];
+      block->sort(type, _keyComparator);
+    }
+  }
+  _sorted = true;
+}
+
+}
+;
+// namespace NativeTask

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h
new file mode 100644
index 0000000..37c45af
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PARTITION_BUCKET_H_
+#define PARTITION_BUCKET_H_
+
+#include "NativeTask.h"
+#include "MemoryPool.h"
+#include "MemoryBlock.h"
+#include "Timer.h"
+#include "Buffers.h"
+#include "MapOutputSpec.h"
+#include "IFile.h"
+#include "SpillInfo.h"
+#include "Combiner.h"
+
+namespace NativeTask {
+
+/**
+ * Buffer for a single partition
+ */
+class PartitionBucket {
+  friend class PartitionBucketIterator;
+  friend class TestPartitionBucket;
+
+private:
+  std::vector<MemoryBlock *> _memBlocks;
+  MemoryPool * _pool;
+  uint32_t _partition;
+  uint32_t _blockSize;
+  ComparatorPtr _keyComparator;
+  ICombineRunner * _combineRunner;
+  bool _sorted;
+
+public:
+  PartitionBucket(MemoryPool * pool, uint32_t partition, ComparatorPtr comparator,
+      ICombineRunner * combineRunner, uint32_t blockSize)
+      : _pool(pool), _partition(partition), _keyComparator(comparator),
+          _combineRunner(combineRunner), _blockSize(blockSize), _sorted(false) {
+    if (NULL == _pool || NULL == comparator) {
+      THROW_EXCEPTION_EX(IOException, "pool is NULL, or comparator is not set");
+    }
+
+    if (NULL != combineRunner) {
+      LOG("[PartitionBucket] combine runner has been set");
+    }
+  }
+
+  ~PartitionBucket() {
+    reset();
+  }
+
+  uint32_t getPartitionId() {
+    return _partition;
+  }
+
+  void reset() {
+    for (uint32_t i = 0; i < _memBlocks.size(); i++) {
+      if (NULL != _memBlocks[i]) {
+        delete _memBlocks[i];
+        _memBlocks[i] = NULL;
+      }
+    }
+    _memBlocks.clear();
+  }
+
+  KVIterator * getIterator();
+
+  uint32_t getKVCount() const {
+    uint32_t size = 0;
+    for (uint32_t i = 0; i < _memBlocks.size(); i++) {
+      MemoryBlock * block = _memBlocks[i];
+      if (NULL != block) {
+        size += block->getKVCount();
+      }
+    }
+    return size;
+  }
+
+  /**
+   * @throws OutOfMemoryException if total_length > io.sort.mb
+   */
+  KVBuffer * allocateKVBuffer(uint32_t kvLength) {
+    if (kvLength == 0) {
+      LOG("KV Length is empty, no need to allocate buffer for it");
+      return NULL;
+    }
+    _sorted = false;
+    MemoryBlock * memBlock = NULL;
+    uint32_t memBockSize = _memBlocks.size();
+    if (memBockSize > 0) {
+      memBlock = _memBlocks[memBockSize - 1];
+    }
+    if (NULL != memBockSize && memBlock->remainSpace() >= kvLength) {
+      return memBlock->allocateKVBuffer(kvLength);
+    } else {
+      uint32_t min = kvLength;
+      uint32_t expect = std::max(_blockSize, min);
+      uint32_t allocated = 0;
+      char * buff = _pool->allocate(min, expect, allocated);
+      if (NULL != buff) {
+        memBlock = new MemoryBlock(buff, allocated);
+        _memBlocks.push_back(memBlock);
+        return memBlock->allocateKVBuffer(kvLength);
+      } else {
+        LOG("MemoryPool is full, fail to allocate new MemBlock, block size: %d, kv length: %d", expect, kvLength);
+      }
+    }
+    return NULL;
+  }
+
+  void sort(SortAlgorithm type);
+
+  void spill(IFileWriter * writer) throw (IOException, UnsupportException);
+
+  uint32_t getMemoryBlockCount() const {
+    return _memBlocks.size();
+  }
+
+  MemoryBlock * getMemoryBlock(uint32_t index) const {
+    return _memBlocks[index];
+  }
+};
+
+}
+;
+//namespace NativeTask
+
+#endif /* PARTITION_BUCKET_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc
new file mode 100644
index 0000000..4a2ca5d
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/Timer.h"
+#include "util/StringUtil.h"
+#include "NativeObjectFactory.h"
+#include "PartitionBucketIterator.h"
+#include "Merge.h"
+#include "NativeTask.h"
+#include "WritableUtils.h"
+#include "util/DualPivotQuickSort.h"
+#include "Combiner.h"
+#include "TaskCounters.h"
+#include "MinHeap.h"
+
+namespace NativeTask {
+
+/////////////////////////////////////////////////////////////////
+// PartitionBucket
+/////////////////////////////////////////////////////////////////
+
+PartitionBucketIterator::PartitionBucketIterator(PartitionBucket * pb, ComparatorPtr comparator)
+    : _pb(pb), _comparator(comparator), _first(true) {
+  uint32_t blockCount = _pb->getMemoryBlockCount();
+  for (uint32_t i = 0; i < blockCount; i++) {
+    MemoryBlock * block = _pb->getMemoryBlock(i);
+    MemBlockIteratorPtr blockIterator = new MemBlockIterator(block);
+    if (blockIterator->next()) {
+      _heap.push_back(blockIterator);
+    }
+  }
+  if (_heap.size() > 0) {
+    makeHeap(&(_heap[0]), &(_heap[0]) + _heap.size(), _comparator);
+  }
+}
+
+PartitionBucketIterator::~PartitionBucketIterator() {
+  for (uint32_t i = 0; i < _heap.size(); i++) {
+    MemBlockIteratorPtr ptr = _heap[i];
+    if (NULL != ptr) {
+      delete ptr;
+      _heap[i] = NULL;
+    }
+  }
+}
+
+bool PartitionBucketIterator::next() {
+  size_t cur_heap_size = _heap.size();
+  if (cur_heap_size > 0) {
+    if (!_first) {
+      if (_heap[0]->next()) { // have more, adjust heap
+        if (cur_heap_size == 1) {
+          return true;
+        } else if (cur_heap_size == 2) {
+          MemBlockIteratorPtr * base = &(_heap[0]);
+
+          if (_comparator(base[1], base[0])) {
+            std::swap(base[0], base[1]);
+          }
+        } else {
+          MemBlockIteratorPtr * base = &(_heap[0]);
+          heapify(base, 1, cur_heap_size, _comparator);
+        }
+      } else { // no more, pop heap
+        MemBlockIteratorPtr * base = &(_heap[0]);
+        popHeap(base, base + cur_heap_size, _comparator);
+        _heap.pop_back();
+      }
+    } else {
+      _first = false;
+    }
+    return _heap.size() > 0;
+  }
+  return false;
+}
+
+bool PartitionBucketIterator::next(Buffer & key, Buffer & value) {
+  bool result = next();
+  if (result) {
+    MemBlockIteratorPtr * base = &(_heap[0]);
+    KVBuffer * kvBuffer = base[0]->getKVBuffer();
+
+    key.reset(kvBuffer->getKey(), kvBuffer->keyLength);
+    value.reset(kvBuffer->getValue(), kvBuffer->valueLength);
+
+    return true;
+  }
+  return false;
+}
+}
+;
+// namespace NativeTask
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h
new file mode 100644
index 0000000..beabff8
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PARTITION_BUCKET_ITERATOR_H_
+#define PARTITION_BUCKET_ITERATOR_H_
+
+#include "NativeTask.h"
+#include "MemoryPool.h"
+#include "Timer.h"
+#include "Buffers.h"
+#include "MapOutputSpec.h"
+#include "IFile.h"
+#include "SpillInfo.h"
+#include "Combiner.h"
+#include "PartitionBucket.h"
+
+namespace NativeTask {
+
+class PartitionBucketIterator : public KVIterator {
+protected:
+  PartitionBucket * _pb;
+  std::vector<MemBlockIteratorPtr> _heap;
+  MemBlockComparator _comparator;
+  bool _first;
+
+public:
+  PartitionBucketIterator(PartitionBucket * pb, ComparatorPtr comparator);
+  virtual ~PartitionBucketIterator();
+  virtual bool next(Buffer & key, Buffer & value);
+
+private:
+  bool next();
+};
+
+}
+;
+//namespace NativeTask
+
+#endif /* PARTITION_BUCKET_ITERATOR_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc
new file mode 100644
index 0000000..6d85fe2
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Path.h"
+
+namespace NativeTask {
+
+bool Path::IsAbsolute(const string & path) {
+  if (path.length() > 0 && path[0] == '/') {
+    return true;
+  }
+  return false;
+}
+
+string Path::GetParent(const string & path) {
+  size_t lastSlash = path.rfind('/');
+  if (lastSlash == path.npos) {
+    return ".";
+  }
+  if (lastSlash == 0 && path.length() == 1) {
+    return "";
+  }
+  if (lastSlash == 0) {
+    return path;
+  }
+  return path.substr(0, lastSlash);
+}
+
+string Path::GetName(const string & path) {
+  size_t lastSlash = path.rfind('/');
+  if (lastSlash == path.npos) {
+    return path;
+  }
+  return path.substr(lastSlash + 1);
+}
+
+} // namespace NativeTask
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h
new file mode 100644
index 0000000..0dc82f2
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PATH_H_
+#define PATH_H_
+
+#include <stdint.h>
+#include <string>
+
+namespace NativeTask {
+
+using std::string;
+
+class Path {
+public:
+  static bool IsAbsolute(const string & path);
+  static string GetParent(const string & path);
+  static string GetName(const string & path);
+};
+
+} // namespace NativeTask
+
+#endif /* PATH_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc
new file mode 100644
index 0000000..bce2cb2
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "Streams.h"
+#include "FileSystem.h"
+#include "Buffers.h"
+#include "SpillInfo.h"
+
+namespace NativeTask {
+
+void SingleSpillInfo::deleteSpillFile() {
+  if (path.length() > 0) {
+    struct stat st;
+    if (0 == stat(path.c_str(), &st)) {
+      remove(path.c_str());
+    }
+  }
+}
+
+void SingleSpillInfo::writeSpillInfo(const std::string & filepath) {
+  OutputStream * fout = FileSystem::getLocal().create(filepath, true);
+  {
+    ChecksumOutputStream dest = ChecksumOutputStream(fout, CHECKSUM_CRC32);
+    AppendBuffer appendBuffer;
+    appendBuffer.init(32 * 1024, &dest, "");
+    uint64_t base = 0;
+
+    for (size_t j = 0; j < this->length; j++) {
+      IFileSegment * segment = &(this->segments[j]);
+      const bool firstSegment = (j == 0);
+      if (firstSegment) {
+        appendBuffer.write_uint64_be(base);
+        appendBuffer.write_uint64_be(segment->uncompressedEndOffset);
+        appendBuffer.write_uint64_be(segment->realEndOffset);
+      } else {
+        appendBuffer.write_uint64_be(base + this->segments[j - 1].realEndOffset);
+        appendBuffer.write_uint64_be(
+            segment->uncompressedEndOffset - this->segments[j - 1].uncompressedEndOffset);
+        appendBuffer.write_uint64_be(segment->realEndOffset - this->segments[j - 1].realEndOffset);
+      }
+    }
+    appendBuffer.flush();
+    uint32_t chsum = dest.getChecksum();
+#ifdef SPILLRECORD_CHECKSUM_UINT
+    chsum = bswap(chsum);
+    fout->write(&chsum, sizeof(uint32_t));
+#else
+    uint64_t wtchsum = bswap64((uint64_t)chsum);
+    fout->write(&wtchsum, sizeof(uint64_t));
+#endif
+  }
+  fout->close();
+  delete fout;
+}
+
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h
new file mode 100644
index 0000000..94eb16e
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PARTITIONINDEX_H_
+#define PARTITIONINDEX_H_
+
+#include <stdint.h>
+#include <string>
+
+namespace NativeTask {
+
+using std::string;
+
+/**
+ * Store spill file segment information
+ */
+struct IFileSegment {
+  // uncompressed stream end position
+  uint64_t uncompressedEndOffset;
+  // compressed stream end position
+  uint64_t realEndOffset;
+};
+
+class SingleSpillInfo {
+public:
+  uint32_t length;
+  std::string path;
+  IFileSegment * segments;
+  ChecksumType checkSumType;
+  KeyValueType keyType;
+  KeyValueType valueType;
+  std::string codec;
+
+  SingleSpillInfo(IFileSegment * segments, uint32_t len, const string & path, ChecksumType checksum,
+      KeyValueType ktype, KeyValueType vtype, const string & inputCodec)
+      : length(len), path(path), segments(segments), checkSumType(checksum), keyType(ktype),
+          valueType(vtype), codec(inputCodec) {
+  }
+
+  ~SingleSpillInfo() {
+    delete[] segments;
+  }
+
+  void deleteSpillFile();
+
+  uint64_t getEndPosition() {
+    return segments ? segments[length - 1].uncompressedEndOffset : 0;
+  }
+
+  uint64_t getRealEndPosition() {
+    return segments ? segments[length - 1].realEndOffset : 0;
+  }
+
+  void writeSpillInfo(const std::string & filepath);
+};
+
+class SpillInfos {
+public:
+  std::vector<SingleSpillInfo*> spills;
+  SpillInfos() {
+  }
+
+  ~SpillInfos() {
+    for (size_t i = 0; i < spills.size(); i++) {
+      delete spills[i];
+    }
+    spills.clear();
+  }
+
+  void deleteAllSpillFiles() {
+    for (size_t i = 0; i < spills.size(); i++) {
+      spills[i]->deleteSpillFile();
+    }
+  }
+
+  void add(SingleSpillInfo * sri) {
+    spills.push_back(sri);
+  }
+
+  uint32_t getSpillCount() const {
+    return spills.size();
+  }
+
+  SingleSpillInfo* getSingleSpillInfo(int index) {
+    return spills.at(index);
+  }
+};
+
+} // namespace NativeTask
+
+#endif /* PARTITIONINDEX_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h
new file mode 100644
index 0000000..16a6685
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SPILL_OUTPUT_SERVICE_H_
+#define SPILL_OUTPUT_SERVICE_H_
+
+#include <stdint.h>
+#include <string>
+
+namespace NativeTask {
+
+class CombineHandler;
+
+using std::string;
+
+class SpillOutputService {
+public:
+  virtual ~SpillOutputService() {}
+
+  virtual string * getSpillPath() = 0;
+  virtual string * getOutputPath() = 0;
+  virtual string * getOutputIndexPath() = 0;
+
+  virtual CombineHandler * getJavaCombineHandler() = 0;
+};
+
+} // namespace NativeTask
+
+#endif /* SPILL_OUTPUT_SERVICE_H_ */


Mime
View raw message