hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [10/64] [abbrv] Import initial code for MAPREDUCE-2841 (native output collector)
Date Sat, 13 Sep 2014 01:41:15 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc
new file mode 100644
index 0000000..c2d64c9
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.cc
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "Checksum.h"
+#include "Streams.h"
+
+namespace NativeTask {
+
+/////////////////////////////////////////////////////////////
+
+void InputStream::seek(uint64_t position) {
+  THROW_EXCEPTION(UnsupportException, "seek not support");
+}
+
+uint64_t InputStream::tell() {
+  THROW_EXCEPTION(UnsupportException, "tell not support");
+}
+
+int32_t InputStream::readFully(void * buff, uint32_t length) {
+  int32_t ret = 0;
+  while (length > 0) {
+    int32_t rd = read(buff, length);
+    if (rd <= 0) {
+      return ret > 0 ? ret : -1;
+    }
+    ret += rd;
+    buff = ((char *)buff) + rd;
+    length -= rd;
+  }
+  return ret;
+}
+
+void InputStream::readAllTo(OutputStream & out, uint32_t bufferHint) {
+  char * buffer = new char[bufferHint];
+  while (true) {
+    int32_t rd = read(buffer, bufferHint);
+    if (rd <= 0) {
+      break;
+    }
+    out.write(buffer, rd);
+  }
+  delete buffer;
+}
+
+/////////////////////////////////////////////////////////////
+
+uint64_t OutputStream::tell() {
+  THROW_EXCEPTION(UnsupportException, "tell not support");
+}
+
+///////////////////////////////////////////////////////////
+
+ChecksumInputStream::ChecksumInputStream(InputStream * stream, ChecksumType type)
+    : FilterInputStream(stream), _type(type), _limit(-1) {
+  resetChecksum();
+}
+
+void ChecksumInputStream::resetChecksum() {
+  _checksum = Checksum::init(_type);
+}
+
+uint32_t ChecksumInputStream::getChecksum() {
+  return Checksum::getValue(_type, _checksum);
+}
+
+int32_t ChecksumInputStream::read(void * buff, uint32_t length) {
+  if (_limit < 0) {
+    int32_t ret = _stream->read(buff, length);
+    if (ret > 0) {
+      Checksum::update(_type, _checksum, buff, ret);
+    }
+    return ret;
+  } else if (_limit == 0) {
+    return -1;
+  } else {
+    int64_t rd = _limit < length ? _limit : length;
+    int32_t ret = _stream->read(buff, rd);
+    if (ret > 0) {
+      _limit -= ret;
+      Checksum::update(_type, _checksum, buff, ret);
+    }
+    return ret;
+  }
+}
+
+///////////////////////////////////////////////////////////
+
+ChecksumOutputStream::ChecksumOutputStream(OutputStream * stream, ChecksumType type)
+    : FilterOutputStream(stream), _type(type) {
+  resetChecksum();
+}
+
+void ChecksumOutputStream::resetChecksum() {
+  _checksum = Checksum::init(_type);
+}
+
+uint32_t ChecksumOutputStream::getChecksum() {
+  return Checksum::getValue(_type, _checksum);
+}
+
+void ChecksumOutputStream::write(const void * buff, uint32_t length) {
+  Checksum::update(_type, _checksum, buff, length);
+  _stream->write(buff, length);
+}
+
+} // namespace NativeTask

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h
new file mode 100644
index 0000000..199762b
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Streams.h
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef STREAMS_H_
+#define STREAMS_H_
+
+#include "util/Checksum.h"
+
+namespace NativeTask {
+
+class OutputStream;
+
+class InputStream {
+public:
+  InputStream() {
+  }
+
+  virtual ~InputStream() {
+  }
+
+  virtual void seek(uint64_t position);
+
+  virtual uint64_t tell();
+
+  virtual int32_t read(void * buff, uint32_t length) {
+    return -1;
+  }
+
+  virtual void close() {
+  }
+
+  virtual int32_t readFully(void * buff, uint32_t length);
+
+  void readAllTo(OutputStream & out, uint32_t bufferHint = 1024 * 4);
+};
+
+class OutputStream {
+public:
+  OutputStream() {
+  }
+
+  virtual ~OutputStream() {
+  }
+
+  virtual uint64_t tell();
+
+  virtual void write(const void * buff, uint32_t length) {
+  }
+
+  virtual void flush() {
+  }
+
+  virtual void close() {
+  }
+};
+
+class FilterInputStream : public InputStream {
+protected:
+  InputStream * _stream;
+public:
+  FilterInputStream(InputStream * stream)
+      : _stream(stream) {
+  }
+
+  virtual ~FilterInputStream() {
+  }
+
+  void setStream(InputStream * stream) {
+    _stream = stream;
+  }
+
+  InputStream * getStream() {
+    return _stream;
+  }
+
+  virtual void seek(uint64_t position) {
+    _stream->seek(position);
+  }
+
+  virtual uint64_t tell() {
+    return _stream->tell();
+  }
+
+  virtual int32_t read(void * buff, uint32_t length) {
+    return _stream->read(buff, length);
+  }
+};
+
+class FilterOutputStream : public OutputStream {
+protected:
+  OutputStream * _stream;
+public:
+  FilterOutputStream(OutputStream * stream)
+      : _stream(stream) {
+  }
+
+  virtual ~FilterOutputStream() {
+  }
+
+  void setStream(OutputStream * stream) {
+    _stream = stream;
+  }
+
+  OutputStream * getStream() {
+    return _stream;
+  }
+
+  virtual uint64_t tell() {
+    return _stream->tell();
+  }
+
+  virtual void write(const void * buff, uint32_t length) {
+    _stream->write(buff, length);
+  }
+
+  virtual void flush() {
+    _stream->flush();
+  }
+
+  virtual void close() {
+    flush();
+  }
+};
+
+class LimitInputStream : public FilterInputStream {
+protected:
+  int64_t _limit;
+public:
+  LimitInputStream(InputStream * stream, int64_t limit)
+      : FilterInputStream(stream), _limit(limit) {
+  }
+
+  virtual ~LimitInputStream() {
+  }
+
+  int64_t getLimit() {
+    return _limit;
+  }
+
+  void setLimit(int64_t limit) {
+    _limit = limit;
+  }
+
+  virtual int32_t read(void * buff, uint32_t length) {
+    if (_limit < 0) {
+      return _stream->read(buff, length);
+    } else if (_limit == 0) {
+      return -1;
+    } else {
+      int64_t rd = _limit < length ? _limit : length;
+      int32_t ret = _stream->read(buff, rd);
+      if (ret > 0) {
+        _limit -= ret;
+      }
+      return ret;
+    }
+  }
+};
+
+class ChecksumInputStream : public FilterInputStream {
+protected:
+  ChecksumType _type;
+  uint32_t _checksum;
+  int64_t _limit;
+public:
+  ChecksumInputStream(InputStream * stream, ChecksumType type);
+
+  virtual ~ChecksumInputStream() {
+  }
+
+  int64_t getLimit() {
+    return _limit;
+  }
+
+  void setLimit(int64_t limit) {
+    _limit = limit;
+  }
+
+  void resetChecksum();
+
+  uint32_t getChecksum();
+
+  virtual int32_t read(void * buff, uint32_t length);
+};
+
+class ChecksumOutputStream : public FilterOutputStream {
+protected:
+  ChecksumType _type;
+  uint32_t _checksum;
+public:
+  ChecksumOutputStream(OutputStream * stream, ChecksumType type);
+
+  virtual ~ChecksumOutputStream() {
+  }
+
+  void resetChecksum();
+
+  uint32_t getChecksum();
+
+  virtual void write(const void * buff, uint32_t length);
+
+};
+
+} // namespace NativeTask
+
+#endif /* STREAMS_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc
new file mode 100644
index 0000000..7aa7db8
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.cc
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TaskCounters.h"
+
+namespace NativeTask {
+
+#define DEFINE_COUNTER(name) const char * TaskCounters::name = #name;
+
+const char * TaskCounters::TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter";
+
+DEFINE_COUNTER(MAP_INPUT_RECORDS)
+DEFINE_COUNTER(MAP_OUTPUT_RECORDS)
+DEFINE_COUNTER(MAP_SKIPPED_RECORDS)
+DEFINE_COUNTER(MAP_INPUT_BYTES)
+DEFINE_COUNTER(MAP_OUTPUT_BYTES)
+DEFINE_COUNTER(MAP_OUTPUT_MATERIALIZED_BYTES)
+DEFINE_COUNTER(COMBINE_INPUT_RECORDS)
+DEFINE_COUNTER(COMBINE_OUTPUT_RECORDS)
+DEFINE_COUNTER(REDUCE_INPUT_GROUPS)
+DEFINE_COUNTER(REDUCE_SHUFFLE_BYTES)
+DEFINE_COUNTER(REDUCE_INPUT_RECORDS)
+DEFINE_COUNTER(REDUCE_OUTPUT_RECORDS)
+DEFINE_COUNTER(REDUCE_SKIPPED_GROUPS)
+DEFINE_COUNTER(REDUCE_SKIPPED_RECORDS)
+DEFINE_COUNTER(SPILLED_RECORDS)
+
+const char * TaskCounters::FILESYSTEM_COUNTER_GROUP = "FileSystemCounters";
+
+DEFINE_COUNTER(FILE_BYTES_READ)
+DEFINE_COUNTER(FILE_BYTES_WRITTEN)
+;
+
+} // namespace NativeTask

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h
new file mode 100644
index 0000000..6afc207
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TaskCounters.h
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TASKCOUNTERS_H_
+#define TASKCOUNTERS_H_
+
+namespace NativeTask {
+
+class TaskCounters {
+public:
+  static const char * TASK_COUNTER_GROUP;
+
+  static const char * MAP_INPUT_RECORDS;
+  static const char * MAP_OUTPUT_RECORDS;
+  static const char * MAP_SKIPPED_RECORDS;
+  static const char * MAP_INPUT_BYTES;
+  static const char * MAP_OUTPUT_BYTES;
+  static const char * MAP_OUTPUT_MATERIALIZED_BYTES;
+  static const char * COMBINE_INPUT_RECORDS;
+  static const char * COMBINE_OUTPUT_RECORDS;
+  static const char * REDUCE_INPUT_GROUPS;
+  static const char * REDUCE_SHUFFLE_BYTES;
+  static const char * REDUCE_INPUT_RECORDS;
+  static const char * REDUCE_OUTPUT_RECORDS;
+  static const char * REDUCE_SKIPPED_GROUPS;
+  static const char * REDUCE_SKIPPED_RECORDS;
+  static const char * SPILLED_RECORDS;
+
+  static const char * FILESYSTEM_COUNTER_GROUP;
+
+  static const char * FILE_BYTES_READ;
+  static const char * FILE_BYTES_WRITTEN;
+};
+
+} // namespace NativeTask
+
+#endif /* TASKCOUNTERS_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h
new file mode 100644
index 0000000..3f96faf
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/TrackingCollector.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TRACKING_COLLECTOR_H
+#define TRACKING_COLLECTOR_H
+
+#include <stdint.h>
+#include <string>
+
+namespace NativeTask {
+
+class TrackingCollector : public Collector {
+protected:
+  Collector * _collector;
+  Counter * _counter;
+public:
+  TrackingCollector(Collector * collector, Counter * counter)
+      : _collector(collector), _counter(counter) {
+  }
+
+  virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen)
{
+    _counter->increase();
+    _collector->collect(key, keyLen, value, valueLen);
+  }
+
+  virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen,
+      int32_t partition) {
+    _counter->increase();
+    _collector->collect(key, keyLen, value, valueLen, partition);
+  }
+};
+
+} //namespace NativeTask
+
+#endif //TRACKING_COLLECTOR_H

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h
new file mode 100644
index 0000000..07b2cf1
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/commons.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMONS_H_
+#define COMMONS_H_
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <stdio.h>
+#include <memory.h>
+#include <fcntl.h>
+
+#include <limits>
+#include <string>
+#include <vector>
+#include <list>
+#include <set>
+#include <map>
+#include <algorithm>
+
+#include "primitives.h"
+#include "Log.h"
+#include "NativeTask.h"
+
+#include "Constants.h"
+
+#include "Iterator.h"
+#include "TrackingCollector.h"
+
+#endif /* COMMONS_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc
new file mode 100644
index 0000000..01eaa57
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.cc
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "util/StringUtil.h"
+#include "util/SyncUtils.h"
+#include "jniutils.h"
+
+using namespace NativeTask;
+
+JavaVM * JNU_GetJVM(void) {
+  static JavaVM * gJVM = NULL;
+  static Lock GJVMLock;
+  if (gJVM != NULL) {
+    return gJVM;
+  }
+  {
+    ScopeLock<Lock> autolock(GJVMLock);
+    if (gJVM == NULL) {
+      jint rv = 0;
+      jint noVMs = 0;
+      rv = JNI_GetCreatedJavaVMs(&gJVM, 1, &noVMs);
+      if (rv != 0) {
+        THROW_EXCEPTION(NativeTask::HadoopException, "JNI_GetCreatedJavaVMs failed");
+      }
+      if (noVMs == 0) {
+        char *hadoopClassPath = getenv("CLASSPATH");
+        if (hadoopClassPath == NULL) {
+          THROW_EXCEPTION(NativeTask::HadoopException, "Environment variable CLASSPATH not
set!");
+          return NULL;
+        }
+        const char *hadoopClassPathVMArg = "-Djava.class.path=";
+        size_t optHadoopClassPathLen = strlen(hadoopClassPath) + strlen(hadoopClassPathVMArg)
+ 1;
+        char *optHadoopClassPath = (char*)malloc(sizeof(char) * optHadoopClassPathLen);
+        snprintf(optHadoopClassPath, optHadoopClassPathLen, "%s%s", hadoopClassPathVMArg,
+            hadoopClassPath);
+        int noArgs = 1;
+        JavaVMOption options[noArgs];
+        options[0].optionString = optHadoopClassPath;
+
+        //Create the VM
+        JavaVMInitArgs vm_args;
+        vm_args.version = JNI_VERSION_1_6;
+        vm_args.options = options;
+        vm_args.nOptions = noArgs;
+        vm_args.ignoreUnrecognized = 1;
+        JNIEnv * jenv;
+        rv = JNI_CreateJavaVM(&gJVM, (void**)&jenv, &vm_args);
+        if (rv != 0) {
+          THROW_EXCEPTION(NativeTask::HadoopException, "JNI_CreateJavaVM failed");
+          return NULL;
+        }
+        free(optHadoopClassPath);
+      }
+    }
+  }
+  return gJVM;
+}
+
+JNIEnv* JNU_GetJNIEnv(void) {
+  JNIEnv * env;
+  jint rv = JNU_GetJVM()->AttachCurrentThread((void **)&env, NULL);
+  if (rv != 0) {
+    THROW_EXCEPTION(NativeTask::HadoopException, "Call to AttachCurrentThread failed");
+  }
+  return env;
+}
+
+void JNU_AttachCurrentThread() {
+  JNU_GetJNIEnv();
+}
+
+void JNU_DetachCurrentThread() {
+  jint rv = JNU_GetJVM()->DetachCurrentThread();
+  if (rv != 0) {
+    THROW_EXCEPTION(NativeTask::HadoopException, "Call to DetachCurrentThread failed");
+  }
+}
+
+void JNU_ThrowByName(JNIEnv *jenv, const char *name, const char *msg) {
+  jclass cls = jenv->FindClass(name);
+  if (cls != NULL) {
+    jenv->ThrowNew(cls, msg);
+  }
+  jenv->DeleteLocalRef(cls);
+}
+
+std::string JNU_ByteArrayToString(JNIEnv * jenv, jbyteArray src) {
+  if (NULL != src) {
+    jsize len = jenv->GetArrayLength(src);
+    std::string ret(len, '\0');
+    jenv->GetByteArrayRegion(src, 0, len, (jbyte*)ret.data());
+    return ret;
+  }
+  return std::string();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h
new file mode 100644
index 0000000..45c4fda
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/jniutils.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef JNIUTILS_H_
+#define JNIUTILS_H_
+
+#include <string>
+#include <jni.h>
+
+/**
+ * Get current JavaVM, if none then try to create one.
+ */
+JavaVM * JNU_GetJVM(void);
+
+/**
+ * Get JNIEnv for current thread.
+ */
+JNIEnv* JNU_GetJNIEnv(void);
+
+/**
+ * Attach currentThread, same effect as JNU_GetJNIEnv.
+ */
+void JNU_AttachCurrentThread();
+
+/**
+ * Detach current thread, call it if current thread
+ * is created in native side and have called
+ * JNU_AttachCurrentThread before
+ */
+void JNU_DetachCurrentThread();
+
+/**
+ * Throw a java exception.
+ */
+void JNU_ThrowByName(JNIEnv *jenv, const char *name, const char *msg);
+
+/**
+ * Convert a java byte array to c++ std::string
+ */
+std::string JNU_ByteArrayToString(JNIEnv * jenv, jbyteArray src);
+
+#endif /* JNIUTILS_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h
new file mode 100644
index 0000000..6d780cf
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/org_apache_hadoop_mapred_nativetask_NativeRuntime.h
@@ -0,0 +1,58 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class org_apache_hadoop_mapred_nativetask_NativeRuntime */
+
+#ifndef _Included_org_apache_hadoop_mapred_nativetask_NativeRuntime
+#define _Included_org_apache_hadoop_mapred_nativetask_NativeRuntime
+#ifdef __cplusplus
+extern "C" {
+#endif
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNIRelease
+ * Signature: ()V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRelease(JNIEnv
*,
+    jclass);
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNIConfigure
+ * Signature: ([[B)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIConfigure(
+    JNIEnv *, jclass, jobjectArray);
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNICreateNativeObject
+ * Signature: ([B)J
+ */JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateNativeObject(
+    JNIEnv *, jclass, jbyteArray);
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNICreateDefaultNativeObject
+ * Signature: ([B)J
+ */JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateDefaultNativeObject(
+    JNIEnv *, jclass, jbyteArray);
+
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNIReleaseNativeObject
+ * Signature: (J)V
+ */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIReleaseNativeObject(
+    JNIEnv *, jclass, jlong);
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNIRegisterModule
+ * Signature: ([B[B)I
+ */JNIEXPORT jint JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRegisterModule(
+    JNIEnv *, jclass, jbyteArray, jbyteArray);
+/*
+ * Class:     org_apache_hadoop_mapred_nativetask_NativeRuntime
+ * Method:    JNIUpdateStatus
+ * Signature: ()[B
+ */JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIUpdateStatus(
+    JNIEnv *, jclass);
+
+#ifdef __cplusplus
+}
+#endif
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h
new file mode 100644
index 0000000..4c0c1a7
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/primitives.h
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * High performance primitive functions
+ *
+ **/
+
+#ifndef PRIMITIVES_H_
+#define PRIMITIVES_H_
+
+#include <stddef.h>
+#include <stdint.h>
+#include <assert.h>
+#include <string>
+
+#ifdef __GNUC__
+#define likely(x)       __builtin_expect((x),1)
+#define unlikely(x)     __builtin_expect((x),0)
+#else
+#define likely(x)       (x)
+#define unlikely(x)     (x)
+#endif
+
+//#define SIMPLE_MEMCPY
+
+#if !defined(SIMPLE_MEMCPY)
+#define simple_memcpy memcpy
+#define simple_memcpy2 memcpy
+#else
+
+/**
+ * This memcpy assumes src & dest are not overlapped,
+ * and len are normally very small(<64)
+ * This function is primarily optimized for x86-64 processors,
+ * on which unaligned 64-bit loads and stores are cheap
+ *
+ * @param dest: dest buffer
+ * @param src:  src buffer
+ * @param len: src buffer size, must be >0
+ */
+inline void simple_memcpy(void * dest, const void * src, size_t len) {
+  const uint8_t * src8 = (const uint8_t*)src;
+  uint8_t * dest8 = (uint8_t*)dest;
+  switch (len) {
+    case 0:
+    return;
+    case 1:
+    dest8[0]=src8[0];
+    return;
+    case 2:
+    *(uint16_t*)dest8=*(const uint16_t*)src8;
+    return;
+    case 3:
+    *(uint16_t*)dest8 = *(const uint16_t*)src8;
+    dest8[2]=src8[2];
+    return;
+    case 4:
+    *(uint32_t*)dest8 = *(const uint32_t*)src8;
+    return;
+  }
+  if (len<8) {
+    *(uint32_t*)dest8 = *(const uint32_t*)src8;
+    *(uint32_t*)(dest8+len-4) = *(const uint32_t*)(src8+len-4);
+    return;
+  }
+  if (len<128) {
+    int64_t cur = (int64_t)len - 8;
+    while (cur>0) {
+      *(uint64_t*)(dest8+cur) = *(const uint64_t*)(src8+cur);
+      cur -= 8;
+    }
+    *(uint64_t*)(dest8) = *(const uint64_t*)(src8);
+    return;
+  }
+  ::memcpy(dest, src, len);
+}
+
+#endif
+
+/**
+ * little-endian to big-endian or vice versa
+ */
+inline uint32_t bswap(uint32_t val) {
+  __asm__("bswap %0" : "=r" (val) : "0" (val));
+  return val;
+}
+
+inline uint64_t bswap64(uint64_t val) {
+#ifdef __X64
+  __asm__("bswapq %0" : "=r" (val) : "0" (val));
+#else
+
+  uint64_t lower = val & 0xffffffffU;
+  uint32_t higher = (val >> 32) & 0xffffffffU;
+
+  lower = bswap(lower);
+  higher = bswap(higher);
+
+  return (lower << 32) + higher;
+
+#endif
+  return val;
+}
+
+/**
+ * Fast memcmp
+ */
+inline int64_t fmemcmp(const char * src, const char * dest, uint32_t len) {
+
+#ifdef BUILDIN_MEMCMP
+  return memcmp(src, dest, len);
+#else
+
+  const uint8_t * src8 = (const uint8_t*)src;
+  const uint8_t * dest8 = (const uint8_t*)dest;
+  switch (len) {
+  case 0:
+    return 0;
+  case 1:
+    return (int64_t)src8[0] - (int64_t)dest8[0];
+  case 2: {
+    int64_t ret = ((int64_t)src8[0] - (int64_t)dest8[0]);
+    if (ret)
+      return ret;
+    return ((int64_t)src8[1] - (int64_t)dest8[1]);
+  }
+  case 3: {
+    int64_t ret = ((int64_t)src8[0] - (int64_t)dest8[0]);
+    if (ret)
+      return ret;
+    ret = ((int64_t)src8[1] - (int64_t)dest8[1]);
+    if (ret)
+      return ret;
+    return ((int64_t)src8[2] - (int64_t)dest8[2]);
+  }
+  case 4: {
+    return (int64_t)bswap(*(uint32_t*)src) - (int64_t)bswap(*(uint32_t*)dest);
+  }
+  }
+  if (len < 8) {
+    int64_t ret = ((int64_t)bswap(*(uint32_t*)src) - (int64_t)bswap(*(uint32_t*)dest));
+    if (ret) {
+      return ret;
+    }
+    return ((int64_t)bswap(*(uint32_t*)(src + len - 4))
+        - (int64_t)bswap(*(uint32_t*)(dest + len - 4)));
+  }
+  uint32_t cur = 0;
+  uint32_t end = len & (0xffffffffU << 3);
+  while (cur < end) {
+    uint64_t l = *(uint64_t*)(src8 + cur);
+    uint64_t r = *(uint64_t*)(dest8 + cur);
+    if (l != r) {
+      l = bswap64(l);
+      r = bswap64(r);
+      return l > r ? 1 : -1;
+    }
+    cur += 8;
+  }
+  uint64_t l = *(uint64_t*)(src8 + len - 8);
+  uint64_t r = *(uint64_t*)(dest8 + len - 8);
+  if (l != r) {
+    l = bswap64(l);
+    r = bswap64(r);
+    return l > r ? 1 : -1;
+  }
+  return 0;
+#endif
+}
+
+inline int64_t fmemcmp(const char * src, const char * dest, uint32_t srcLen, uint32_t destLen)
{
+  uint32_t minlen = srcLen < destLen ? srcLen : destLen;
+  int64_t ret = fmemcmp(src, dest, minlen);
+  if (ret) {
+    return ret;
+  }
+  return (int64_t)srcLen - (int64_t)destLen;
+}
+
+/**
+ * Fast memory equal
+ */
+inline bool fmemeq(const char * src, const char * dest, uint32_t len) {
+#ifdef BUILDIN_MEMCMP
+  return 0 == memcmp(src, dest, len);
+#else
+
+  const uint8_t * src8 = (const uint8_t*)src;
+  const uint8_t * dest8 = (const uint8_t*)dest;
+  switch (len) {
+  case 0:
+    return true;
+  case 1:
+    return src8[0] == dest8[0];
+  case 2:
+    return *(uint16_t*)src8 == *(uint16_t*)dest8;
+  case 3:
+    return (*(uint16_t*)src8 == *(uint16_t*)dest8) && (src8[2] == dest8[2]);
+  case 4:
+    return *(uint32_t*)src8 == *(uint32_t*)dest8;
+  }
+  if (len < 8) {
+    return (*(uint32_t*)src8 == *(uint32_t*)dest8)
+        && (*(uint32_t*)(src8 + len - 4) == *(uint32_t*)(dest8 + len - 4));
+  }
+  uint32_t cur = 0;
+  uint32_t end = len & (0xffffffff << 3);
+  while (cur < end) {
+    uint64_t l = *(uint64_t*)(src8 + cur);
+    uint64_t r = *(uint64_t*)(dest8 + cur);
+    if (l != r) {
+      return false;
+    }
+    cur += 8;
+  }
+  uint64_t l = *(uint64_t*)(src8 + len - 8);
+  uint64_t r = *(uint64_t*)(dest8 + len - 8);
+  if (l != r) {
+    return false;
+  }
+  return true;
+#endif
+}
+
+inline bool fmemeq(const char * src, uint32_t srcLen, const char * dest, uint32_t destLen)
{
+  if (srcLen != destLen) {
+    return false;
+  }
+  return fmemeq(src, dest, std::min(srcLen, destLen));
+}
+
+/**
+ * Fast memory equal, reverse order
+ */
+inline bool frmemeq(const char * src, const char * dest, uint32_t len) {
+  const uint8_t * src8 = (const uint8_t*)src;
+  const uint8_t * dest8 = (const uint8_t*)dest;
+  switch (len) {
+  case 0:
+    return true;
+  case 1:
+    return src8[0] == dest8[0];
+  case 2:
+    return *(uint16_t*)src8 == *(uint16_t*)dest8;
+  case 3:
+    return (src8[2] == dest8[2]) && (*(uint16_t*)src8 == *(uint16_t*)dest8);
+  case 4:
+    return *(uint32_t*)src8 == *(uint32_t*)dest8;
+  }
+  if (len < 8) {
+    return (*(uint32_t*)(src8 + len - 4) == *(uint32_t*)(dest8 + len - 4))
+        && (*(uint32_t*)src8 == *(uint32_t*)dest8);
+  }
+  int32_t cur = (int32_t)len - 8;
+  while (cur > 0) {
+    if (*(uint64_t*)(src8 + cur) != *(uint64_t*)(dest8 + cur)) {
+      return false;
+    }
+    cur -= 8;
+  }
+  return *(uint64_t*)(src8) == *(uint64_t*)(dest8);
+}
+
+inline bool frmemeq(const char * src, const char * dest, uint32_t srcLen, uint32_t destLen)
{
+  if (srcLen != destLen) {
+    return false;
+  }
+  return frmemeq(src, dest, std::min(srcLen, destLen));
+}
+
+#endif /* PRIMITIVES_H_ */


Mime
View raw message