ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [14/50] [abbrv] ignite git commit: IGNITE-4507: Hadoop: added direct output support for combiner. This closes #1434.
Date Fri, 17 Feb 2017 09:59:24 GMT
IGNITE-4507: Hadoop: added direct output support for combiner. This closes #1434.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/476b089b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/476b089b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/476b089b

Branch: refs/heads/ignite-1.9
Commit: 476b089b1dd4b4c5d3b6ae21e1b3b2c010c086ac
Parents: b6005b0
Author: tledkov-gridgain <tledkov@gridgain.com>
Authored: Fri Jan 20 17:33:34 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Fri Jan 20 17:35:39 2017 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopTaskContext.java    | 10 +++
 .../hadoop/impl/v1/HadoopV1MapTask.java         | 89 +++++++++++---------
 .../hadoop/impl/v1/HadoopV1ReduceTask.java      | 69 +++++++++------
 .../hadoop/impl/v2/HadoopV2Context.java         | 10 ---
 .../hadoop/impl/v2/HadoopV2MapTask.java         | 18 ++--
 .../hadoop/impl/v2/HadoopV2ReduceTask.java      | 14 +++
 .../hadoop/impl/v2/HadoopV2TaskContext.java     |  1 +
 .../hadoop/shuffle/HadoopShuffleJob.java        |  7 --
 .../shuffle/direct/HadoopDirectDataInput.java   |  2 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java | 12 ++-
 .../impl/HadoopAbstractMapReduceTest.java       |  2 +
 .../impl/HadoopMapReduceEmbeddedSelfTest.java   |  6 +-
 12 files changed, 145 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index dddd017..d6e9394 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -207,4 +207,14 @@ public abstract class HadoopTaskContext {
      * @throws IgniteCheckedException On any error in callable.
      */
     public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
+
+    /**
+     * Callback invoked from mapper thread when map is finished.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onMapperFinished() throws IgniteCheckedException {
+        if (output instanceof HadoopMapperAwareTaskOutput)
+            ((HadoopMapperAwareTaskOutput)output).onMapperFinished();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index 65ff280..2aa4292 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -45,7 +46,7 @@ public class HadoopV1MapTask extends HadoopV1Task {
     /**
      * Constructor.
      *
-     * @param taskInfo 
+     * @param taskInfo Taks info.
      */
     public HadoopV1MapTask(HadoopTaskInfo taskInfo) {
         super(taskInfo);
@@ -56,67 +57,79 @@ public class HadoopV1MapTask extends HadoopV1Task {
     @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
         HadoopJob job = taskCtx.job();
 
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+        HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
 
-        JobConf jobConf = ctx.jobConf();
+        if (taskCtx.taskInfo().hasMapperIndex())
+            HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+        else
+            HadoopMapperUtils.clearMapperIndex();
 
-        InputFormat inFormat = jobConf.getInputFormat();
+        try {
+            JobConf jobConf = taskCtx0.jobConf();
 
-        HadoopInputSplit split = info().inputSplit();
+            InputFormat inFormat = jobConf.getInputFormat();
 
-        InputSplit nativeSplit;
+            HadoopInputSplit split = info().inputSplit();
 
-        if (split instanceof HadoopFileBlock) {
-            HadoopFileBlock block = (HadoopFileBlock)split;
+            InputSplit nativeSplit;
 
-            nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(),
block.length(), EMPTY_HOSTS);
-        }
-        else
-            nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+            if (split instanceof HadoopFileBlock) {
+                HadoopFileBlock block = (HadoopFileBlock)split;
 
-        assert nativeSplit != null;
+                nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(),
block.length(), EMPTY_HOSTS);
+            }
+            else
+                nativeSplit = (InputSplit)taskCtx0.getNativeSplit(split);
 
-        Reporter reporter = new HadoopV1Reporter(taskCtx);
+            assert nativeSplit != null;
 
-        HadoopV1OutputCollector collector = null;
+            Reporter reporter = new HadoopV1Reporter(taskCtx);
 
-        try {
-            collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
-                fileName(), ctx.attemptId());
+            HadoopV1OutputCollector collector = null;
 
-            RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+            try {
+                collector = collector(jobConf, taskCtx0, !job.info().hasCombiner() &&
!job.info().hasReducer(),
+                    fileName(), taskCtx0.attemptId());
 
-            Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+                RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
 
-            Object key = reader.createKey();
-            Object val = reader.createValue();
+                Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
 
-            assert mapper != null;
+                Object key = reader.createKey();
+                Object val = reader.createValue();
+
+                assert mapper != null;
 
-            try {
                 try {
-                    while (reader.next(key, val)) {
-                        if (isCancelled())
-                            throw new HadoopTaskCancelledException("Map task cancelled.");
+                    try {
+                        while (reader.next(key, val)) {
+                            if (isCancelled())
+                                throw new HadoopTaskCancelledException("Map task cancelled.");
+
+                            mapper.map(key, val, collector, reporter);
+                        }
 
-                        mapper.map(key, val, collector, reporter);
+                        taskCtx.onMapperFinished();
+                    }
+                    finally {
+                        mapper.close();
                     }
                 }
                 finally {
-                    mapper.close();
+                    collector.closeWriter();
                 }
+
+                collector.commit();
             }
-            finally {
-                collector.closeWriter();
-            }
+            catch (Exception e) {
+                if (collector != null)
+                    collector.abort();
 
-            collector.commit();
+                throw new IgniteCheckedException(e);
+            }
         }
-        catch (Exception e) {
-            if (collector != null)
-                collector.abort();
-
-            throw new IgniteCheckedException(e);
+        finally {
+            HadoopMapperUtils.clearMapperIndex();
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 92c024e..5c1dd15 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -53,49 +54,63 @@ public class HadoopV1ReduceTask extends HadoopV1Task {
     @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
         HadoopJob job = taskCtx.job();
 
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+        HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
 
-        JobConf jobConf = ctx.jobConf();
-
-        HadoopTaskInput input = taskCtx.input();
-
-        HadoopV1OutputCollector collector = null;
+        if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+            HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+        else
+            HadoopMapperUtils.clearMapperIndex();
 
         try {
-            collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(),
ctx.attemptId());
+            JobConf jobConf = taskCtx0.jobConf();
 
-            Reducer reducer;
-            if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
-                jobConf);
-            else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
-                jobConf);
+            HadoopTaskInput input = taskCtx.input();
 
-            assert reducer != null;
+            HadoopV1OutputCollector collector = null;
 
             try {
+                collector = collector(jobConf, taskCtx0, reduce || !job.info().hasReducer(),
fileName(), taskCtx0.attemptId());
+
+                Reducer reducer;
+                if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
+                    jobConf);
+                else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
+                    jobConf);
+
+                assert reducer != null;
+
                 try {
-                    while (input.next()) {
-                        if (isCancelled())
-                            throw new HadoopTaskCancelledException("Reduce task cancelled.");
+                    try {
+                        while (input.next()) {
+                            if (isCancelled())
+                                throw new HadoopTaskCancelledException("Reduce task cancelled.");
+
+                            reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+                        }
 
-                        reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+                        if (!reduce)
+                            taskCtx.onMapperFinished();
+                    }
+                    finally {
+                        reducer.close();
                     }
                 }
                 finally {
-                    reducer.close();
+                    collector.closeWriter();
                 }
+
+                collector.commit();
             }
-            finally {
-                collector.closeWriter();
-            }
+            catch (Exception e) {
+                if (collector != null)
+                    collector.abort();
 
-            collector.commit();
+                throw new IgniteCheckedException(e);
+            }
         }
-        catch (Exception e) {
-            if (collector != null)
-                collector.abort();
-
-            throw new IgniteCheckedException(e);
+        finally {
+            if (!reduce)
+                HadoopMapperUtils.clearMapperIndex();
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index eec0636..1f4e675 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@ -154,16 +154,6 @@ public class HadoopV2Context extends JobContextImpl implements MapContext,
Reduc
         }
     }
 
-    /**
-     * Callback invoked from mapper thread when map is finished.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void onMapperFinished() throws IgniteCheckedException {
-        if (output instanceof HadoopMapperAwareTaskOutput)
-            ((HadoopMapperAwareTaskOutput)output).onMapperFinished();
-    }
-
     /** {@inheritDoc} */
     @Override public OutputCommitter getOutputCommitter() {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
index eb3b935..1519199 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
@@ -56,30 +56,32 @@ public class HadoopV2MapTask extends HadoopV2Task {
             HadoopMapperUtils.clearMapperIndex();
 
         try {
-            InputSplit nativeSplit = hadoopContext().getInputSplit();
+            HadoopV2Context hadoopCtx = hadoopContext();
+
+            InputSplit nativeSplit = hadoopCtx.getInputSplit();
 
             if (nativeSplit == null)
                 throw new IgniteCheckedException("Input split cannot be null.");
 
             InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
-                hadoopContext().getConfiguration());
+                hadoopCtx.getConfiguration());
 
-            RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext());
+            RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopCtx);
 
-            reader.initialize(nativeSplit, hadoopContext());
+            reader.initialize(nativeSplit, hadoopCtx);
 
-            hadoopContext().reader(reader);
+            hadoopCtx.reader(reader);
 
             HadoopJobInfo jobInfo = taskCtx.job().info();
 
             outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx);
 
-            Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration());
+            Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopCtx.getConfiguration());
 
             try {
-                mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
+                mapper.run(new WrappedMapper().getMapContext(hadoopCtx));
 
-                hadoopContext().onMapperFinished();
+                taskCtx.onMapperFinished();
             }
             finally {
                 closeWriter();

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
index 930ec1d..09e0634 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 
 /**
@@ -53,10 +54,17 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
 
         JobContextImpl jobCtx = taskCtx.jobContext();
 
+        // Set mapper index for combiner tasks
+        if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+            HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+        else
+            HadoopMapperUtils.clearMapperIndex();
+
         try {
             outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx)
: null;
 
             Reducer reducer;
+
             if (reduce) reducer = ReflectionUtils.newInstance(jobCtx.getReducerClass(),
                 jobCtx.getConfiguration());
             else reducer = ReflectionUtils.newInstance(jobCtx.getCombinerClass(),
@@ -64,6 +72,9 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
 
             try {
                 reducer.run(new WrappedReducer().getReducerContext(hadoopContext()));
+
+                if (!reduce)
+                    taskCtx.onMapperFinished();
             }
             finally {
                 closeWriter();
@@ -84,6 +95,9 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
             throw new IgniteCheckedException(e);
         }
         finally {
+            if (!reduce)
+                HadoopMapperUtils.clearMapperIndex();
+
             if (err != null)
                 abort(outputFormat);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index d328550..475e43d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
 import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
 import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
 import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 318ead3..4bcc398 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -182,13 +182,6 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, true);
 
         if (stripeMappers0) {
-            if (job.info().hasCombiner()) {
-                log.info("Striped mapper output is disabled because it cannot be used together
with combiner [jobId=" +
-                    job.id() + ']');
-
-                stripeMappers0 = false;
-            }
-
             if (!embedded) {
                 log.info("Striped mapper output is disabled becuase it cannot be used in
external mode [jobId=" +
                     job.id() + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
index e3a713a..ef2905b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
@@ -48,7 +48,7 @@ public class HadoopDirectDataInput extends InputStream implements DataInput
{
 
     /** {@inheritDoc} */
     @Override public int read() throws IOException {
-        return readByte();
+        return (int)readByte() & 0xFF;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index a57efe6..339bf5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -122,7 +122,7 @@ public abstract class HadoopRunnableTask implements Callable<Void>
{
 
     /**
      * Implements actual task running.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException On error.
      */
     void call0() throws IgniteCheckedException {
         execStartTs = U.currentTimeMillis();
@@ -144,7 +144,15 @@ public abstract class HadoopRunnableTask implements Callable<Void>
{
             runTask(perfCntr);
 
             if (info.type() == MAP && job.info().hasCombiner()) {
-                ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(),
info.attempt(), null));
+                // Switch to combiner.
+                HadoopTaskInfo combineTaskInfo = new HadoopTaskInfo(COMBINE, info.jobId(),
info.taskNumber(),
+                    info.attempt(), null);
+
+                // Mapper and combiner share the same index.
+                if (ctx.taskInfo().hasMapperIndex())
+                    combineTaskInfo.mapperIndex(ctx.taskInfo().mapperIndex());
+
+                ctx.taskInfo(combineTaskInfo);
 
                 try {
                     runTask(perfCntr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
index 89005f6..cd997a4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
@@ -172,6 +172,8 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest
{
      */
     protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner,
boolean useNewReducer)
         throws Exception {
+        log.info("useNewMapper=" + useNewMapper + ", useNewCombiner=" + useNewCombiner +
", useNewReducer=" + useNewReducer);
+
         igfs.delete(new IgfsPath(PATH_OUTPUT), true);
 
         JobConf jobConf = new JobConf();

http://git-wip-us.apache.org/repos/asf/ignite/blob/476b089b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
index 8897a38..bce67f6 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -55,14 +55,14 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest
{
         return cfg;
     }
 
-    /*
+    /**
      * @throws Exception If fails.
      */
     public void testMultiReducerWholeMapReduceExecution() throws Exception {
         checkMultiReducerWholeMapReduceExecution(false);
     }
 
-    /*
+    /**
      * @throws Exception If fails.
      */
     public void testMultiReducerWholeMapReduceExecutionStriped() throws Exception {
@@ -100,6 +100,8 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest
{
 
             if (striped)
                 jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(),
"true");
+            else
+                jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(),
"false");
 
             jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
 


Mime
View raw message