hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [28/64] [abbrv] git commit: MAPREDUCE-5996. native-task: Rename system tests into standard directory layout. Contributed by Todd Lipcon.
Date Sat, 13 Sep 2014 01:41:33 GMT
MAPREDUCE-5996. native-task: Rename system tests into standard directory layout. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-2841@1613004 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b2cba48f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b2cba48f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b2cba48f

Branch: refs/heads/trunk
Commit: b2cba48f36c96f190f6a25e65291873be7aee322
Parents: d468a92
Author: Todd Lipcon <todd@apache.org>
Authored: Thu Jul 24 06:17:33 2014 +0000
Committer: Todd Lipcon <todd@apache.org>
Committed: Thu Jul 24 06:17:33 2014 +0000

----------------------------------------------------------------------
 .../CHANGES.MAPREDUCE-2841.txt                  |   1 +
 .../nativetask/combinertest/CombinerTest.java   | 122 +++++++++++
 .../combinertest/LargeKVCombinerTest.java       |  96 +++++++++
 .../combinertest/OldAPICombinerTest.java        | 107 ++++++++++
 .../nativetask/combinertest/WordCount.java      |  89 ++++++++
 .../combinertest/WordCountWithOldAPI.java       |  65 ++++++
 .../nativetask/compresstest/CompressMapper.java |  69 ++++++
 .../nativetask/compresstest/CompressTest.java   | 134 ++++++++++++
 .../nativetask/kvtest/HashSumReducer.java       |  48 +++++
 .../hadoop/mapred/nativetask/kvtest/KVJob.java  |  97 +++++++++
 .../hadoop/mapred/nativetask/kvtest/KVTest.java | 181 ++++++++++++++++
 .../mapred/nativetask/kvtest/LargeKVTest.java   | 129 +++++++++++
 .../mapred/nativetask/kvtest/TestInputFile.java | 213 +++++++++++++++++++
 .../nativetask/nonsorttest/NonSortTest.java     |  99 +++++++++
 .../nativetask/nonsorttest/NonSortTestMR.java   |  71 +++++++
 .../nativetask/testutil/BytesFactory.java       | 104 +++++++++
 .../EnforceNativeOutputCollectorDelegator.java  |  48 +++++
 .../nativetask/testutil/MockValueClass.java     |  72 +++++++
 .../nativetask/testutil/ResultVerifier.java     | 141 ++++++++++++
 .../testutil/ScenarioConfiguration.java         |  58 +++++
 .../nativetask/testutil/TestConstants.java      |  67 ++++++
 .../test/java/system/data/testGlibcBugSpill.out |   2 -
 .../nativetask/combinertest/CombinerTest.java   | 122 -----------
 .../combinertest/LargeKVCombinerTest.java       |  96 ---------
 .../combinertest/OldAPICombinerTest.java        | 107 ----------
 .../nativetask/combinertest/WordCount.java      |  89 --------
 .../combinertest/WordCountWithOldAPI.java       |  65 ------
 .../nativetask/compresstest/CompressMapper.java |  69 ------
 .../nativetask/compresstest/CompressTest.java   | 134 ------------
 .../nativetask/kvtest/HashSumReducer.java       |  48 -----
 .../hadoop/mapred/nativetask/kvtest/KVJob.java  |  97 ---------
 .../hadoop/mapred/nativetask/kvtest/KVTest.java | 181 ----------------
 .../mapred/nativetask/kvtest/LargeKVTest.java   | 129 -----------
 .../mapred/nativetask/kvtest/TestInputFile.java | 213 -------------------
 .../nativetask/nonsorttest/NonSortTest.java     |  99 ---------
 .../nativetask/nonsorttest/NonSortTestMR.java   |  71 -------
 .../nativetask/testutil/BytesFactory.java       | 104 ---------
 .../EnforceNativeOutputCollectorDelegator.java  |  48 -----
 .../nativetask/testutil/MockValueClass.java     |  72 -------
 .../nativetask/testutil/ResultVerifier.java     | 141 ------------
 .../testutil/ScenarioConfiguration.java         |  58 -----
 .../nativetask/testutil/TestConstants.java      |  67 ------
 42 files changed, 2011 insertions(+), 2012 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
index 166d68d..cea5a76 100644
--- a/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
+++ b/hadoop-mapreduce-project/CHANGES.MAPREDUCE-2841.txt
@@ -3,3 +3,4 @@ Changes for Hadoop Native Map Output Collector
 
 MAPREDUCE-5985. native-task: Fix build on macosx. Contributed by Binglin Chang
 MAPREDUCE-5994. Simplify ByteUtils and fix failing test. (todd)
+MAPREDUCE-5996. native-task: Rename system tests into standard directory layout (todd)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
new file mode 100644
index 0000000..8a4aa6f
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/CombinerTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.nativetask.combinertest.WordCount.IntSumReducer;
+import org.apache.hadoop.mapred.nativetask.combinertest.WordCount.TokenizerMapper;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CombinerTest {
+  private FileSystem fs;
+  private String inputpath;
+  private String nativeoutputpath;
+  private String hadoopoutputpath;
+
+  @Test
+  public void testWordCountCombiner() {
+    try {
+
+      final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+      nativeConf.addResource(TestConstants.COMBINER_CONF_PATH);
+      final Job nativejob = getJob("nativewordcount", nativeConf, inputpath, nativeoutputpath);
+
+      final Configuration commonConf = ScenarioConfiguration.getNormalConfiguration();
+      commonConf.addResource(TestConstants.COMBINER_CONF_PATH);
+
+      final Job normaljob = getJob("normalwordcount", commonConf, inputpath, hadoopoutputpath);
+
+      nativejob.waitForCompletion(true);
+            
+      Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+      
+      normaljob.waitForCompletion(true);
+      Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+       
+      assertEquals(true, ResultVerifier.verify(nativeoutputpath, hadoopoutputpath));
+      assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter", 
+          nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+      
+    } catch (final Exception e) {
+      e.printStackTrace();
+      assertEquals("run exception", true, false);
+    }
+  }
+
+  @Before
+  public void startUp() throws Exception {
+    final ScenarioConfiguration conf = new ScenarioConfiguration();
+    conf.addcombinerConf();
+
+    this.fs = FileSystem.get(conf);
+
+    this.inputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY,
+        TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/wordcount";
+
+    if (!fs.exists(new Path(inputpath))) {
+      new TestInputFile(
+          conf.getInt(TestConstants.NATIVETASK_COMBINER_WORDCOUNT_FILESIZE, 1000000),
+          Text.class.getName(),
+          Text.class.getName(), conf).createSequenceTestFile(inputpath, 1, (byte)('a'));
+    }
+
+    this.nativeoutputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH,
+        TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/nativewordcount";
+    this.hadoopoutputpath = conf.get(TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH,
+        TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/normalwordcount";
+  }
+
+  protected static Job getJob(String jobname, Configuration inputConf, String inputpath, String outputpath)
+      throws Exception {
+    final Configuration conf = new Configuration(inputConf);
+    conf.set("fileoutputpath", outputpath);
+    final FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(new Path(outputpath))) {
+      fs.delete(new Path(outputpath));
+    }
+    fs.close();
+    final Job job = new Job(conf, jobname);
+    job.setJarByClass(WordCount.class);
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileInputFormat.addInputPath(job, new Path(inputpath));
+    FileOutputFormat.setOutputPath(job, new Path(outputpath));
+    return job;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
new file mode 100644
index 0000000..50953e0
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/LargeKVCombinerTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+public class LargeKVCombinerTest {
+
+  @Test
+  public void testLargeValueCombiner(){
+    final Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+    final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+    normalConf.addResource(TestConstants.COMBINER_CONF_PATH);
+    nativeConf.addResource(TestConstants.COMBINER_CONF_PATH);
+    final int deafult_KVSize_Maximum = 1 << 22; // 4M
+    final int KVSize_Maximu = normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST,
+        deafult_KVSize_Maximum);
+    final String inputPath = normalConf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY,
+        TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/largeKV";
+    final String nativeOutputPath = normalConf.get(TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH,
+        TestConstants.NATIVETASK_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/nativeLargeKV";
+    final String hadoopOutputPath = normalConf.get(TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH,
+        TestConstants.NORMAL_TEST_COMBINER_OUTPUTPATH_DEFAULTV) + "/normalLargeKV";
+    try {
+      final FileSystem fs = FileSystem.get(normalConf);
+      for (int i = 65536; i <= KVSize_Maximu; i *= 4) {
+        
+        int max = i;
+        int min = Math.max(i / 4, max - 10);
+        
+        System.out.println("===KV Size Test: min size: " + min + ", max size: " + max);
+        
+        normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+        normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+        nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+        nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+        fs.delete(new Path(inputPath), true);
+        new TestInputFile(normalConf.getInt(TestConstants.NATIVETASK_COMBINER_WORDCOUNT_FILESIZE,
+            1000000), IntWritable.class.getName(),
+            Text.class.getName(), normalConf).createSequenceTestFile(inputPath, 1);
+        
+        final Job normaljob = CombinerTest.getJob("normalwordcount", normalConf, inputPath, hadoopOutputPath);
+        final Job nativejob = CombinerTest.getJob("nativewordcount", nativeConf, inputPath, nativeOutputPath);
+        
+        nativejob.waitForCompletion(true);
+        Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+        
+        normaljob.waitForCompletion(true);
+        Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+        
+        final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath);
+        
+        final String reason = "LargeKVCombinerTest failed with, min size: " + min
+            + ", max size: " + max + ", normal out: " + hadoopOutputPath + ", native Out: " + nativeOutputPath;
+        
+        assertEquals(reason, true, compareRet);
+//        assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter", 
+//            nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+      }
+      fs.close();
+    } catch (final Exception e) {
+      e.printStackTrace();
+      assertEquals("run exception", true, false);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java
new file mode 100644
index 0000000..5691e02
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/OldAPICombinerTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Counter;
+import org.junit.Before;
+import org.junit.Test;
+
+public class OldAPICombinerTest {
+  private FileSystem fs;
+  private String inputpath;
+
+  @Test
+  public void testWordCountCombinerWithOldAPI() throws Exception {
+    final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+    nativeConf.addResource(TestConstants.COMBINER_CONF_PATH);
+    final String nativeoutput = nativeConf.get(TestConstants.OLDAPI_NATIVETASK_TEST_COMBINER_OUTPUTPATH);
+    final JobConf nativeJob = getOldAPIJobconf(nativeConf, "nativeCombinerWithOldAPI", inputpath, nativeoutput);
+    RunningJob nativeRunning = JobClient.runJob(nativeJob);
+
+    Counter nativeReduceGroups = nativeRunning.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+    
+    final Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+    normalConf.addResource(TestConstants.COMBINER_CONF_PATH);
+    final String normaloutput = normalConf.get(TestConstants.OLDAPI_NORMAL_TEST_COMBINER_OUTPUTPATH);
+    final JobConf normalJob = getOldAPIJobconf(normalConf, "normalCombinerWithOldAPI", inputpath, normaloutput);
+    
+    RunningJob normalRunning = JobClient.runJob(normalJob);
+    Counter normalReduceGroups = normalRunning.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
+    
+    final boolean compareRet = ResultVerifier.verify(nativeoutput, normaloutput);
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+    
+    assertEquals("The input reduce record count must be same", nativeReduceGroups.getValue(), normalReduceGroups.getValue());
+  }
+
+  @Before
+  public void startUp() throws Exception {
+    final ScenarioConfiguration conf = new ScenarioConfiguration();
+    conf.addcombinerConf();
+    this.fs = FileSystem.get(conf);
+    this.inputpath = conf.get(TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_KEY,
+        TestConstants.NATIVETASK_TEST_COMBINER_INPUTPATH_DEFAULTV) + "/wordcount";
+
+    if (!fs.exists(new Path(inputpath))) {
+      new TestInputFile(conf.getInt("nativetask.combiner.wordcount.filesize", 1000000), Text.class.getName(),
+          Text.class.getName(), conf).createSequenceTestFile(inputpath, 1, (byte)('a'));
+    }
+  }
+
+  private static JobConf getOldAPIJobconf(Configuration configuration, String name, String input, String output)
+      throws Exception {
+    final JobConf jobConf = new JobConf(configuration);
+    final FileSystem fs = FileSystem.get(configuration);
+    if (fs.exists(new Path(output))) {
+      fs.delete(new Path(output), true);
+    }
+    fs.close();
+    jobConf.setJobName(name);
+    jobConf.setOutputKeyClass(Text.class);
+    jobConf.setOutputValueClass(IntWritable.class);
+    jobConf.setMapperClass(WordCountWithOldAPI.TokenizerMapperWithOldAPI.class);
+    jobConf.setCombinerClass(WordCountWithOldAPI.IntSumReducerWithOldAPI.class);
+    jobConf.setReducerClass(WordCountWithOldAPI.IntSumReducerWithOldAPI.class);
+
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    jobConf.setOutputFormat(TextOutputFormat.class);
+
+    FileInputFormat.setInputPaths(jobConf, new Path(input));
+    FileOutputFormat.setOutputPath(jobConf, new Path(output));
+    return jobConf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java
new file mode 100644
index 0000000..490b82b
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCount.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class WordCount {
+
+  private static Log LOG = LogFactory.getLog(WordCount.class);
+  
+  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
+
+    private final static IntWritable one = new IntWritable(1);
+    private final Text word = new Text();
+
+    @Override
+    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
+      final StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+
+  public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+    private final IntWritable result = new IntWritable();
+
+    @Override
+    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
+    InterruptedException {
+      int sum = 0;
+      for (final IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.write(key, result);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    final Configuration conf = new Configuration();
+    final String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    if (otherArgs.length != 2) {
+      System.err.println("Usage: wordcount <in> <out>");
+      System.exit(2);
+    }
+    final Job job = new Job(conf, conf.get(MRJobConfig.JOB_NAME, "word count"));
+    job.setJarByClass(WordCount.class);
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java
new file mode 100644
index 0000000..a11ea91
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/combinertest/WordCountWithOldAPI.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.combinertest;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+public class WordCountWithOldAPI {
+
+  public static class TokenizerMapperWithOldAPI extends MapReduceBase implements
+  Mapper<Object, Text, Text, IntWritable> {
+    private final static IntWritable one = new IntWritable(1);
+    private final Text word = new Text();
+
+    @Override
+    public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
+        throws IOException {
+      final StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        output.collect(word, one);
+      }
+    }
+  }
+
+  public static class IntSumReducerWithOldAPI extends MapReduceBase implements
+  Reducer<Text, IntWritable, Text, IntWritable> {
+    private final IntWritable result = new IntWritable();
+
+    @Override
+    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
+        Reporter reporter) throws IOException {
+      int sum = 0;
+      while (values.hasNext()) {
+        sum += values.next().get();
+      }
+      result.set(sum);
+      output.collect(key, result);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java
new file mode 100644
index 0000000..83c4794
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressMapper.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.compresstest;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class CompressMapper {
+  public static final String inputFile = "./compress/input.txt";
+  public static final String outputFileDir = "./compress/output/";
+
+  public static class TextCompressMapper extends Mapper<Text, Text, Text, Text> {
+
+    @Override
+    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+       context.write(key, value);
+    }
+  }
+
+  public static Job getCompressJob(String jobname, Configuration conf) {
+    Job job = null;
+    try {
+      job = new Job(conf, jobname + "-CompressMapperJob");
+      job.setJarByClass(CompressMapper.class);
+      job.setMapperClass(TextCompressMapper.class);
+      job.setOutputKeyClass(Text.class);
+      job.setMapOutputValueClass(Text.class);
+      final Path outputpath = new Path(outputFileDir + jobname);
+      // if output file exists ,delete it
+      final FileSystem hdfs = FileSystem.get(new ScenarioConfiguration());
+      if (hdfs.exists(outputpath)) {
+        hdfs.delete(outputpath);
+      }
+      hdfs.close();
+      job.setInputFormatClass(SequenceFileInputFormat.class);
+      FileInputFormat.addInputPath(job, new Path(inputFile));
+      FileOutputFormat.setOutputPath(job, outputpath);
+    } catch (final Exception e) {
+      e.printStackTrace();
+    }
+    return job;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
new file mode 100644
index 0000000..0406375
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/compresstest/CompressTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.compresstest;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CompressTest {
+
+  @Test
+  public void testSnappyCompress() throws Exception {
+    final Configuration conf = ScenarioConfiguration.getNativeConfiguration();
+    conf.addResource(TestConstants.SNAPPY_COMPRESS_CONF_PATH);
+    final Job job = CompressMapper.getCompressJob("nativesnappy", conf);
+    job.waitForCompletion(true);
+
+    final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration();
+    hadoopconf.addResource(TestConstants.SNAPPY_COMPRESS_CONF_PATH);
+    final Job hadoopjob = CompressMapper.getCompressJob("hadoopsnappy", hadoopconf);
+    hadoopjob.waitForCompletion(true);
+
+    final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativesnappy",
+        CompressMapper.outputFileDir + "hadoopsnappy");
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Test
+  public void testGzipCompress() throws Exception {
+    final Configuration conf = ScenarioConfiguration.getNativeConfiguration();
+    conf.addResource(TestConstants.GZIP_COMPRESS_CONF_PATH);
+    final Job job = CompressMapper.getCompressJob("nativegzip", conf);
+    job.waitForCompletion(true);
+
+    final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration();
+    hadoopconf.addResource(TestConstants.GZIP_COMPRESS_CONF_PATH);
+    final Job hadoopjob = CompressMapper.getCompressJob("hadoopgzip", hadoopconf);
+    hadoopjob.waitForCompletion(true);
+
+    final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativegzip",
+        CompressMapper.outputFileDir + "hadoopgzip");
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Test
+  public void testBzip2Compress() throws Exception {
+    final Configuration nativeconf = ScenarioConfiguration.getNativeConfiguration();
+    nativeconf.addResource(TestConstants.BZIP2_COMPRESS_CONF_PATH);
+    final Job nativejob = CompressMapper.getCompressJob("nativebzip2", nativeconf);
+    nativejob.waitForCompletion(true);
+
+    final Configuration hadoopconf = ScenarioConfiguration.getNormalConfiguration();
+    hadoopconf.addResource(TestConstants.BZIP2_COMPRESS_CONF_PATH);
+    final Job hadoopjob = CompressMapper.getCompressJob("hadoopbzip2", hadoopconf);
+    hadoopjob.waitForCompletion(true);
+
+    final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativebzip2",
+        CompressMapper.outputFileDir + "hadoopbzip2");
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Test
+  public void testLz4Compress() throws Exception {
+    final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+    nativeConf.addResource(TestConstants.LZ4_COMPRESS_CONF_PATH);
+    final Job nativeJob = CompressMapper.getCompressJob("nativelz4", nativeConf);
+    nativeJob.waitForCompletion(true);
+
+    final Configuration hadoopConf = ScenarioConfiguration.getNormalConfiguration();
+    hadoopConf.addResource(TestConstants.LZ4_COMPRESS_CONF_PATH);
+    final Job hadoopJob = CompressMapper.getCompressJob("hadooplz4", hadoopConf);
+    hadoopJob.waitForCompletion(true);
+    final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativelz4",
+        CompressMapper.outputFileDir + "hadooplz4");
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Test
+  public void testDefaultCompress() throws Exception {
+    final Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+    nativeConf.addResource(TestConstants.DEFAULT_COMPRESS_CONF_PATH);
+    final Job nativeJob = CompressMapper.getCompressJob("nativedefault", nativeConf);
+    nativeJob.waitForCompletion(true);
+
+    final Configuration hadoopConf = ScenarioConfiguration.getNormalConfiguration();
+    hadoopConf.addResource(TestConstants.DEFAULT_COMPRESS_CONF_PATH);
+    final Job hadoopJob = CompressMapper.getCompressJob("hadoopdefault", hadoopConf);
+    hadoopJob.waitForCompletion(true);
+    final boolean compareRet = ResultVerifier.verify(CompressMapper.outputFileDir + "nativedefault",
+        CompressMapper.outputFileDir + "hadoopdefault");
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Before
+  public void startUp() throws Exception {
+    final ScenarioConfiguration conf = new ScenarioConfiguration();
+    final FileSystem fs = FileSystem.get(conf);
+    final Path path = new Path(CompressMapper.inputFile);
+    fs.delete(path);
+    if (!fs.exists(path)) {
+      new TestInputFile(ScenarioConfiguration.getNormalConfiguration().getInt(
+          TestConstants.NATIVETASK_COMPRESS_FILESIZE, 100000),
+          Text.class.getName(), Text.class.getName(), conf)
+      .createSequenceTestFile(CompressMapper.inputFile);
+
+    }
+    fs.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java
new file mode 100644
index 0000000..ce23780
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/HashSumReducer.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class HashSumReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, IntWritable> {
+
+  ByteArrayOutputStream os = new ByteArrayOutputStream();
+  DataOutputStream dos = new DataOutputStream(os);
+
+  @Override
+  public void reduce(KTYPE key, Iterable<VTYPE> values, Context context) throws IOException, InterruptedException {
+    int hashSum = 0;
+    for (final VTYPE val : values) {
+      if (val instanceof Writable) {
+        os.reset();
+        ((Writable) val).write(dos);
+        final int hash = Arrays.hashCode(os.toByteArray());
+        hashSum += hash;
+      }
+    }
+
+    context.write(key, new IntWritable(hashSum));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java
new file mode 100644
index 0000000..6d683f8
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVJob.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import java.io.IOException;
+import java.util.zip.CRC32;
+
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.nativetask.testutil.BytesFactory;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class KVJob {
+  public static final String INPUTPATH = "nativetask.kvtest.inputfile.path";
+  public static final String OUTPUTPATH = "nativetask.kvtest.outputfile.path";
+  Job job = null;
+
+  public static class ValueMapper<KTYPE, VTYPE> extends Mapper<KTYPE, VTYPE, KTYPE, VTYPE> {
+    @Override
+    public void map(KTYPE key, VTYPE value, Context context) throws IOException, InterruptedException {
+      context.write(key, value);
+    }
+  }
+
+  public static class KVMReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, VTYPE> {
+    public void reduce(KTYPE key, VTYPE value, Context context) throws IOException, InterruptedException {
+      context.write(key, value);
+    }
+  }
+
+  public static class KVReducer<KTYPE, VTYPE> extends Reducer<KTYPE, VTYPE, KTYPE, VTYPE> {
+
+    @Override
+    public void reduce(KTYPE key, Iterable<VTYPE> values, Context context) throws IOException, InterruptedException {
+      long resultlong = 0;// 8 bytes match BytesFactory.fromBytes function
+      final CRC32 crc32 = new CRC32();
+      for (final VTYPE val : values) {
+        crc32.reset();
+        crc32.update(BytesFactory.toBytes(val));
+        resultlong += crc32.getValue();
+      }
+      final VTYPE V = null;
+      context.write(key, (VTYPE) BytesFactory.newObject(Longs.toByteArray(resultlong), V.getClass().getName()));
+    }
+  }
+
+  public KVJob(String jobname, Configuration conf, Class<?> keyclass, Class<?> valueclass, String inputpath,
+      String outputpath) throws Exception {
+    job = new Job(conf, jobname);
+    job.setJarByClass(KVJob.class);
+    job.setMapperClass(KVJob.ValueMapper.class);
+    job.setOutputKeyClass(keyclass);
+    job.setMapOutputValueClass(valueclass);
+    
+    if (conf.get(TestConstants.NATIVETASK_KVTEST_CREATEFILE).equals("true")) {
+      final FileSystem fs = FileSystem.get(conf);
+      fs.delete(new Path(inputpath), true);
+      fs.close();
+      final TestInputFile testfile = new TestInputFile(Integer.valueOf(conf.get(
+          TestConstants.FILESIZE_KEY, "1000")),
+          keyclass.getName(), valueclass.getName(), conf);
+      testfile.createSequenceTestFile(inputpath);
+
+    }
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileInputFormat.addInputPath(job, new Path(inputpath));
+    FileOutputFormat.setOutputPath(job, new Path(outputpath));
+  }
+
+  public void runJob() throws Exception {
+
+    job.waitForCompletion(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
new file mode 100644
index 0000000..1e08854
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/KVTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class KVTest {
+  private static Class<?>[] keyclasses = null;
+  private static Class<?>[] valueclasses = null;
+  private static String[] keyclassNames = null;
+  private static String[] valueclassNames = null;
+
+  private static Configuration nativekvtestconf = ScenarioConfiguration.getNativeConfiguration();
+  private static Configuration hadoopkvtestconf = ScenarioConfiguration.getNormalConfiguration();
+  static {
+    nativekvtestconf.addResource(TestConstants.KVTEST_CONF_PATH);
+    hadoopkvtestconf.addResource(TestConstants.KVTEST_CONF_PATH);
+  }
+
+  @Parameters(name = "key:{0}\nvalue:{1}")
+  public static Iterable<Class<?>[]> data() {
+    final String valueclassesStr = nativekvtestconf
+        .get(TestConstants.NATIVETASK_KVTEST_VALUECLASSES);
+    System.out.println(valueclassesStr);
+    valueclassNames = valueclassesStr.replaceAll("\\s", "").split(";");// delete
+    // " "
+    final ArrayList<Class<?>> tmpvalueclasses = new ArrayList<Class<?>>();
+    for (int i = 0; i < valueclassNames.length; i++) {
+      try {
+        if (valueclassNames[i].equals("")) {
+          continue;
+        }
+        tmpvalueclasses.add(Class.forName(valueclassNames[i]));
+      } catch (final ClassNotFoundException e) {
+        e.printStackTrace();
+      }
+    }
+    valueclasses = tmpvalueclasses.toArray(new Class[tmpvalueclasses.size()]);
+    final String keyclassesStr = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_KEYCLASSES);
+    System.out.println(keyclassesStr);
+    keyclassNames = keyclassesStr.replaceAll("\\s", "").split(";");// delete
+    // " "
+    final ArrayList<Class<?>> tmpkeyclasses = new ArrayList<Class<?>>();
+    for (int i = 0; i < keyclassNames.length; i++) {
+      try {
+        if (keyclassNames[i].equals("")) {
+          continue;
+        }
+        tmpkeyclasses.add(Class.forName(keyclassNames[i]));
+      } catch (final ClassNotFoundException e) {
+        e.printStackTrace();
+      }
+    }
+    keyclasses = tmpkeyclasses.toArray(new Class[tmpkeyclasses.size()]);
+    final Class<?>[][] kvgroup = new Class<?>[keyclassNames.length * valueclassNames.length][2];
+    for (int i = 0; i < keyclassNames.length; i++) {
+      final int tmpindex = i * valueclassNames.length;
+      for (int j = 0; j < valueclassNames.length; j++) {
+        kvgroup[tmpindex + j][0] = keyclasses[i];
+        kvgroup[tmpindex + j][1] = valueclasses[j];
+      }
+    }
+    return Arrays.asList(kvgroup);
+  }
+
+  private final Class<?> keyclass;
+  private final Class<?> valueclass;
+
+  public KVTest(Class<?> keyclass, Class<?> valueclass) {
+    this.keyclass = keyclass;
+    this.valueclass = valueclass;
+
+  }
+
+  @Test
+  public void testKVCompability() {
+
+    try {
+      final String nativeoutput = this.runNativeTest(
+          "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass);
+      final String normaloutput = this.runNormalTest(
+          "Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass);
+      final boolean compareRet = ResultVerifier.verify(normaloutput, nativeoutput);
+      final String input = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/"
+          + keyclass.getName()
+          + "/" + valueclass.getName();
+      if(compareRet){
+        final FileSystem fs = FileSystem.get(hadoopkvtestconf);
+        fs.delete(new Path(nativeoutput), true);
+        fs.delete(new Path(normaloutput), true);
+        fs.delete(new Path(input), true);
+        fs.close();
+      }
+      assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+    } catch (final IOException e) {
+      assertEquals("test run exception:", null, e);
+    } catch (final Exception e) {
+      assertEquals("test run exception:", null, e);
+    }
+  }
+
+  @Before
+  public void startUp() {
+
+  }
+
+  private String runNativeTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws IOException {
+    final String inputpath = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/"
+        + keyclass.getName()
+        + "/" + valueclass.getName();
+    final String outputpath = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_OUTPUTDIR) + "/"
+        + keyclass.getName() + "/" + valueclass.getName();
+    // if output file exists ,then delete it
+    final FileSystem fs = FileSystem.get(nativekvtestconf);
+    fs.delete(new Path(outputpath));
+    fs.close();
+    nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
+    try {
+      final KVJob keyJob = new KVJob(jobname, nativekvtestconf, keyclass, valueclass, inputpath, outputpath);
+      keyJob.runJob();
+    } catch (final Exception e) {
+      return "native testcase run time error.";
+    }
+    return outputpath;
+  }
+
+  private String runNormalTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws IOException {
+    final String inputpath = hadoopkvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/"
+        + keyclass.getName()
+        + "/" + valueclass.getName();
+    final String outputpath = hadoopkvtestconf
+        .get(TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR)
+        + "/"
+        + keyclass.getName() + "/" + valueclass.getName();
+    // if output file exists ,then delete it
+    final FileSystem fs = FileSystem.get(hadoopkvtestconf);
+    fs.delete(new Path(outputpath));
+    fs.close();
+    hadoopkvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false");
+    try {
+      final KVJob keyJob = new KVJob(jobname, hadoopkvtestconf, keyclass, valueclass, inputpath, outputpath);
+      keyJob.runJob();
+    } catch (final Exception e) {
+      return "normal testcase run time error.";
+    }
+    return outputpath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
new file mode 100644
index 0000000..900b058
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/LargeKVTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.junit.Test;
+
+public class LargeKVTest {
+
+  @Test
+  public void testKeySize() {
+    runKVSizeTests(Text.class, IntWritable.class);
+  }
+
+  @Test
+  public void testValueSize() {
+    runKVSizeTests(IntWritable.class, Text.class);
+  }
+
+  private static Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+  private static Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+  static {
+    nativeConf.addResource(TestConstants.KVTEST_CONF_PATH);
+    nativeConf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
+    normalConf.addResource(TestConstants.KVTEST_CONF_PATH);
+    normalConf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false");
+  }
+
+  public void runKVSizeTests(Class<?> keyClass, Class<?> valueClass) {
+    if (!keyClass.equals(Text.class) && !valueClass.equals(Text.class)) {
+      return;
+    }
+    final int deafult_KVSize_Maximum = 1 << 22; // 4M
+    final int KVSize_Maximu = normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST,
+        deafult_KVSize_Maximum);
+    try {
+
+      for (int i = 65536; i <= KVSize_Maximu; i *= 4) {
+        int min = i / 4;
+        int max = i;
+        nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+        nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+        normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
+        normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
+
+        System.out.println("===KV Size Test: min size: " + min + ", max size: " + max + ", keyClass: "
+            + keyClass.getName() + ", valueClass: " + valueClass.getName());
+
+        final String nativeOutPut = runNativeLargeKVTest("Test Large Value Size:" + String.valueOf(i), keyClass,
+            valueClass, nativeConf);
+        final String normalOutPut = this.runNormalLargeKVTest("Test Large Key Size:" + String.valueOf(i), keyClass,
+            valueClass, normalConf);
+        final boolean compareRet = ResultVerifier.verify(normalOutPut, nativeOutPut);
+        final String reason = "keytype: " + keyClass.getName() + ", valuetype: " + valueClass.getName()
+            + ", failed with " + (keyClass.equals(Text.class) ? "key" : "value") + ", min size: " + min
+            + ", max size: " + max + ", normal out: " + normalOutPut + ", native Out: " + nativeOutPut;
+        assertEquals(reason, true, compareRet);
+      }
+    } catch (final Exception e) {
+      // TODO: handle exception
+      // assertEquals("test run exception:", null, e);
+      e.printStackTrace();
+    }
+  }
+
+  private String runNativeLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf)
+      throws Exception {
+    final String inputpath = conf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/LargeKV/" + keyclass.getName()
+        + "/" + valueclass.getName();
+    final String outputpath = conf.get(TestConstants.NATIVETASK_KVTEST_OUTPUTDIR) + "/LargeKV/" + keyclass.getName()
+        + "/" + valueclass.getName();
+    // if output file exists ,then delete it
+    final FileSystem fs = FileSystem.get(conf);
+    fs.delete(new Path(outputpath), true);
+    fs.close();
+    try {
+      final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath);
+      keyJob.runJob();
+    } catch (final Exception e) {
+      return "normal testcase run time error.";
+    }
+    return outputpath;
+  }
+
+  private String runNormalLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf)
+      throws IOException {
+    final String inputpath = conf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/LargeKV/" + keyclass.getName()
+        + "/" + valueclass.getName();
+    final String outputpath = conf.get(TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR) + "/LargeKV/"
+        + keyclass.getName() + "/" + valueclass.getName();
+    // if output file exists ,then delete it
+    final FileSystem fs = FileSystem.get(conf);
+    fs.delete(new Path(outputpath), true);
+    fs.close();
+    try {
+      final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath);
+      keyJob.runJob();
+    } catch (final Exception e) {
+      return "normal testcase run time error.";
+    }
+    return outputpath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java
new file mode 100644
index 0000000..a194697
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/kvtest/TestInputFile.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.kvtest;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapred.nativetask.testutil.BytesFactory;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+
+
+public class TestInputFile {
+	
+  public static class KVSizeScope {
+    private static final int DefaultMinNum = 1;
+    private static final int DefaultMaxNum = 64;
+
+    public int minBytesNum;
+    public int maxBytesNum;
+
+    public KVSizeScope() {
+      this.minBytesNum = DefaultMinNum;
+      this.maxBytesNum = DefaultMaxNum;
+    }
+
+    public KVSizeScope(int min, int max) {
+      this.minBytesNum = min;
+      this.maxBytesNum = max;
+    }
+  }
+
+  private static HashMap<String, KVSizeScope> map = new HashMap<String, KVSizeScope>();
+
+  private byte[] databuf = null;
+  private final String keyClsName, valueClsName;
+  private int filesize = 0;
+  private int keyMaxBytesNum, keyMinBytesNum;
+  private int valueMaxBytesNum, valueMinBytesNum;
+  private SequenceFile.Writer writer = null;
+  Random r = new Random();
+  public static final int DATABUFSIZE = 1 << 22; // 4M
+
+  private enum State {
+    KEY, VALUE
+  };
+  
+  static {
+    map.put(BooleanWritable.class.getName(), new KVSizeScope(1, 1));
+    map.put(DoubleWritable.class.getName(), new KVSizeScope(8, 8));
+    map.put(FloatWritable.class.getName(), new KVSizeScope(4, 4));
+    map.put(VLongWritable.class.getName(), new KVSizeScope(8, 8));
+    map.put(ByteWritable.class.getName(), new KVSizeScope(1, 1));
+    map.put(LongWritable.class.getName(), new KVSizeScope(8, 8));
+    map.put(VIntWritable.class.getName(), new KVSizeScope(4, 4));
+    map.put(IntWritable.class.getName(), new KVSizeScope(4, 4));
+  }
+  
+  public TestInputFile(int filesize, String keytype, String valuetype, Configuration conf) throws Exception {
+    this.filesize = filesize;
+    this.databuf = new byte[DATABUFSIZE];
+    this.keyClsName = keytype;
+    this.valueClsName = valuetype;
+    final int defaultMinBytes = conf.getInt(TestConstants.NATIVETASK_KVSIZE_MIN, 1);
+    final int defaultMaxBytes = conf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX, 64);
+
+    if (map.get(keytype) != null) {
+      keyMinBytesNum = map.get(keytype).minBytesNum;
+      keyMaxBytesNum = map.get(keytype).maxBytesNum;
+    } else {
+      keyMinBytesNum = defaultMinBytes;
+      keyMaxBytesNum = defaultMaxBytes;
+    }
+
+    if (map.get(valuetype) != null) {
+      valueMinBytesNum = map.get(valuetype).minBytesNum;
+      valueMaxBytesNum = map.get(valuetype).maxBytesNum;
+    } else {
+      valueMinBytesNum = defaultMinBytes;
+      valueMaxBytesNum = defaultMaxBytes;
+    }
+  }
+
+  public void createSequenceTestFile(String filepath) throws Exception {
+    int FULL_BYTE_SPACE = 256;
+    createSequenceTestFile(filepath, FULL_BYTE_SPACE);
+  }
+
+  public void createSequenceTestFile(String filepath, int base) throws Exception {
+    createSequenceTestFile(filepath, base, (byte)0);
+  }
+  
+  public void createSequenceTestFile(String filepath, int base,  byte start) throws Exception {
+    System.out.println("create file " + filepath);
+    System.out.println(keyClsName + " " + valueClsName);
+    Class<?> tmpkeycls, tmpvaluecls;
+    try {
+      tmpkeycls = Class.forName(keyClsName);
+    } catch (final ClassNotFoundException e) {
+      throw new Exception("key class not found: ", e);
+    }
+    try {
+      tmpvaluecls = Class.forName(valueClsName);
+    } catch (final ClassNotFoundException e) {
+      throw new Exception("key class not found: ", e);
+    }
+    try {
+      final Path outputfilepath = new Path(filepath);
+      final ScenarioConfiguration conf= new ScenarioConfiguration();
+      final FileSystem hdfs = outputfilepath.getFileSystem(conf);
+      writer = new SequenceFile.Writer(hdfs, conf, outputfilepath, tmpkeycls, tmpvaluecls);
+    } catch (final Exception e) {
+      e.printStackTrace();
+    }
+
+    int tmpfilesize = this.filesize;
+    while (tmpfilesize > DATABUFSIZE) {
+      nextRandomBytes(databuf, base, start);
+      final int size = flushBuf(DATABUFSIZE);
+      tmpfilesize -= size;
+    }
+    nextRandomBytes(databuf, base, start);
+    flushBuf(tmpfilesize);
+
+    if (writer != null) {
+      IOUtils.closeStream(writer);
+    } else {
+      throw new Exception("no writer to create sequenceTestFile!");
+    }
+  }
+  
+  private void nextRandomBytes(byte[] buf, int base) {
+    nextRandomBytes(buf, base, (byte)0);
+  }
+  
+  private void nextRandomBytes(byte[] buf, int base, byte start) {
+    r.nextBytes(buf);
+    for (int i = 0; i < buf.length; i++) {
+      buf[i] = (byte) ((buf[i] & 0xFF) % base + start);
+    }
+  }
+
+  private int flushBuf(int buflen) throws Exception {
+    final Random r = new Random();
+    int keybytesnum = 0;
+    int valuebytesnum = 0;
+    int offset = 0;
+
+    while (offset < buflen) {
+      final int remains = buflen - offset;
+      keybytesnum = keyMaxBytesNum;
+      if (keyMaxBytesNum != keyMinBytesNum) {
+        keybytesnum = keyMinBytesNum + r.nextInt(keyMaxBytesNum - keyMinBytesNum);
+      }
+
+      valuebytesnum = valueMaxBytesNum;
+      if (valueMaxBytesNum != valueMinBytesNum) {
+        valuebytesnum = valueMinBytesNum + r.nextInt(valueMaxBytesNum - valueMinBytesNum);
+      }
+
+      if (keybytesnum + valuebytesnum > remains) {
+        break;
+      }
+
+      final byte[] key = new byte[keybytesnum];
+      final byte[] value = new byte[valuebytesnum];
+
+      System.arraycopy(databuf, offset, key, 0, keybytesnum);
+      offset += keybytesnum;
+
+      System.arraycopy(databuf, offset, value, 0, valuebytesnum);
+      offset += valuebytesnum;
+      
+      try {
+        writer.append(BytesFactory.newObject(key, this.keyClsName), BytesFactory.newObject(value, this.valueClsName));
+      } catch (final IOException e) {
+        e.printStackTrace();
+        throw new Exception("sequence file create failed", e);
+      }
+    }
+    return offset;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
new file mode 100644
index 0000000..b7b03e7
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.nonsorttest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.kvtest.TestInputFile;
+import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
+import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
+import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NonSortTest {
+
+  @Test
+  public void nonSortTest() throws Exception {
+    Configuration nativeConf = ScenarioConfiguration.getNativeConfiguration();
+    nativeConf.addResource(TestConstants.NONSORT_TEST_CONF);
+    nativeConf.set(TestConstants.NATIVETASK_MAP_OUTPUT_SORT, "false");
+    String inputpath = nativeConf.get(TestConstants.NONSORT_TEST_INPUTDIR);
+    String outputpath = nativeConf.get(TestConstants.NONSORT_TEST_NATIVE_OUTPUT);
+    final Job nativeNonSort = getJob(nativeConf, "NativeNonSort", inputpath, outputpath);
+    nativeNonSort.waitForCompletion(true);
+
+    Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
+    normalConf.addResource(TestConstants.NONSORT_TEST_CONF);
+    inputpath = normalConf.get(TestConstants.NONSORT_TEST_INPUTDIR);
+    outputpath = normalConf.get(TestConstants.NONSORT_TEST_NORMAL_OUTPUT);
+    final Job hadoopWithSort = getJob(normalConf, "NormalJob", inputpath, outputpath);
+    hadoopWithSort.waitForCompletion(true);
+
+    final boolean compareRet = ResultVerifier.verify(nativeConf.get(TestConstants.NONSORT_TEST_NATIVE_OUTPUT),
+        normalConf.get(TestConstants.NONSORT_TEST_NORMAL_OUTPUT));
+    assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
+  }
+
+  @Before
+  public void startUp() throws Exception {
+    final ScenarioConfiguration configuration = new ScenarioConfiguration();
+    configuration.addNonSortTestConf();
+    final FileSystem fs = FileSystem.get(configuration);
+    final Path path = new Path(configuration.get(TestConstants.NONSORT_TEST_INPUTDIR));
+    if (!fs.exists(path)) {
+      new TestInputFile(configuration.getInt("nativetask.nonsorttest.filesize", 10000000), Text.class.getName(),
+          Text.class.getName(), configuration).createSequenceTestFile(path.toString());
+    }
+    fs.close();
+  }
+
+  private Job getJob(Configuration conf, String jobName, String inputpath, String outputpath) throws IOException {
+    final FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(new Path(outputpath))) {
+      fs.delete(new Path(outputpath), true);
+    }
+    fs.close();
+    final Job job = new Job(conf, jobName);
+    job.setJarByClass(NonSortTestMR.class);
+    job.setMapperClass(NonSortTestMR.Map.class);
+    job.setReducerClass(NonSortTestMR.KeyHashSumReduce.class);
+    job.setOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    job.setOutputValueClass(LongWritable.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    FileInputFormat.addInputPath(job, new Path(inputpath));
+    FileOutputFormat.setOutputPath(job, new Path(outputpath));
+    return job;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java
new file mode 100644
index 0000000..4ca2449
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/nonsorttest/NonSortTestMR.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.nonsorttest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class NonSortTestMR {
+
+  public static class Map extends Mapper<Object, Text, Text, IntWritable> {
+    private final static IntWritable one = new IntWritable(1);
+    private final Text word = new Text();
+
+    @Override
+    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
+      final String line = value.toString();
+      final StringTokenizer tokenizer = new StringTokenizer(line);
+      while (tokenizer.hasMoreTokens()) {
+        word.set(tokenizer.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+
+  public static class KeyHashSumReduce extends Reducer<Text, IntWritable, Text, LongWritable> {
+    long sum = 0;
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(os);
+
+    @Override
+    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
+    InterruptedException {
+      for (final IntWritable val : values) {
+        os.reset();
+        key.write(dos);
+        final int hash = Arrays.hashCode(os.toByteArray());
+        sum += hash;
+      }
+    }
+
+    @Override
+    public void cleanup(Context context) throws IOException, InterruptedException {
+      context.write(new Text("NonSortTest"), new LongWritable(sum));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java
new file mode 100644
index 0000000..5185371
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/BytesFactory.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.testutil;
+
+import java.util.Random;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapred.nativetask.util.BytesUtil;
+
+
+public class BytesFactory {
+  public static Random r = new Random();
+
+  public static Object newObject(byte[] seed, String className) {
+    r.setSeed(seed.hashCode());
+    if (className.equals(IntWritable.class.getName())) {
+      return new IntWritable(Ints.fromByteArray(seed));
+    } else if (className.equals(FloatWritable.class.getName())) {
+      return new FloatWritable(r.nextFloat());
+    } else if (className.equals(DoubleWritable.class.getName())) {
+      return new DoubleWritable(r.nextDouble());
+    } else if (className.equals(LongWritable.class.getName())) {
+      return new LongWritable(Longs.fromByteArray(seed));
+    } else if (className.equals(VIntWritable.class.getName())) {
+      return new VIntWritable(Ints.fromByteArray(seed));
+    } else if (className.equals(VLongWritable.class.getName())) {
+      return new VLongWritable(Longs.fromByteArray(seed));
+    } else if (className.equals(BooleanWritable.class.getName())) {
+      return new BooleanWritable(seed[0] % 2 == 1 ? true : false);
+    } else if (className.equals(Text.class.getName())) {
+      return new Text(BytesUtil.toStringBinary(seed));
+    } else if (className.equals(ByteWritable.class.getName())) {
+      return new ByteWritable(seed.length > 0 ? seed[0] : 0);
+    } else if (className.equals(BytesWritable.class.getName())) {
+      return new BytesWritable(seed);
+    } else if (className.equals(UTF8.class.getName())) {
+      return new UTF8(BytesUtil.toStringBinary(seed));
+    } else if (className.equals(MockValueClass.class.getName())) {
+      return new MockValueClass(seed);
+    } else {
+      return null;
+    }
+  }
+
+
+  public static <VTYPE> byte[] fromBytes(byte[] bytes) throws Exception {
+    throw new Exception("Not supported");
+  }
+
+  public static <VTYPE> byte[] toBytes(VTYPE obj) {
+    final String className = obj.getClass().getName();
+    if (className.equals(IntWritable.class.getName())) {
+      return Ints.toByteArray(((IntWritable) obj).get());
+    } else if (className.equals(FloatWritable.class.getName())) {
+      return BytesUtil.toBytes(((FloatWritable) obj).get());
+    } else if (className.equals(DoubleWritable.class.getName())) {
+      return BytesUtil.toBytes(((DoubleWritable) obj).get());
+    } else if (className.equals(LongWritable.class.getName())) {
+      return Longs.toByteArray(((LongWritable) obj).get());
+    } else if (className.equals(VIntWritable.class.getName())) {
+      return Ints.toByteArray(((VIntWritable) obj).get());
+    } else if (className.equals(VLongWritable.class.getName())) {
+      return Longs.toByteArray(((VLongWritable) obj).get());
+    } else if (className.equals(BooleanWritable.class.getName())) {
+      return BytesUtil.toBytes(((BooleanWritable) obj).get());
+    } else if (className.equals(Text.class.getName())) {
+      return ((Text)obj).copyBytes();
+    } else if (className.equals(ByteWritable.class.getName())) {
+      return Ints.toByteArray((int) ((ByteWritable) obj).get());
+    } else if (className.equals(BytesWritable.class.getName())) {
+      // TODO: copyBytes instead?
+      return ((BytesWritable) obj).getBytes();
+    } else {
+      return new byte[0];
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cba48f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/EnforceNativeOutputCollectorDelegator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/EnforceNativeOutputCollectorDelegator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/EnforceNativeOutputCollectorDelegator.java
new file mode 100644
index 0000000..a0f7d64
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/test/java/org/apache/hadoop/mapred/nativetask/testutil/EnforceNativeOutputCollectorDelegator.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.testutil;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.nativetask.NativeMapOutputCollectorDelegator;
+
+public class EnforceNativeOutputCollectorDelegator<K, V> extends NativeMapOutputCollectorDelegator<K, V> {
+  private boolean nativetaskloaded = false;
+
+  @Override
+  public void init(Context context)
+ throws IOException, ClassNotFoundException {
+    try {
+      super.init(context);
+      nativetaskloaded = true;
+    } catch (final Exception e) {
+      nativetaskloaded = false;
+      System.err.println("load nativetask lib failed, Native-Task Delegation is disabled");
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void collect(K key, V value, int partition) throws IOException, InterruptedException {
+    if (this.nativetaskloaded) {
+      super.collect(key, value, partition);
+    } else {
+      // nothing to do.
+    }
+  }
+}


Mime
View raw message