ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [33/63] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode.
Date Tue, 27 Sep 2016 11:09:28 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
new file mode 100644
index 0000000..96fa892
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
+import org.apache.ignite.internal.processors.hadoop.HadoopTask;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1CleanupTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Partitioner;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1ReduceTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1SetupTask;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.unwrapSplit;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.FsCacheKey;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+
+/**
+ * Context for task execution.
+ */
+public class HadoopV2TaskContext extends HadoopTaskContext {
+    /** */
+    private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
+
+    /** Lazy per-user file system cache used by the Hadoop task. */
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
+        = createHadoopLazyConcurrentMap();
+
+    /**
+     * This method is called with reflection upon Job finish with class loader of each task.
+     * This will clean up all the Fs created for specific task.
+     * Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders
+     * are different.
+     *
+     * @throws IgniteCheckedException On error.
+     */
+    public static void close() throws IgniteCheckedException {
+        fsMap.close();
+    }
+
+    /**
+     * Check for combiner grouping support (available since Hadoop 2.3).
+     */
+    static {
+        boolean ok;
+
+        try {
+            JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
+
+            ok = true;
+        }
+        catch (NoSuchMethodException ignore) {
+            ok = false;
+        }
+
+        COMBINE_KEY_GROUPING_SUPPORTED = ok;
+    }
+
+    /** Flag is set if new context-object code is used for running the mapper. */
+    private final boolean useNewMapper;
+
+    /** Flag is set if new context-object code is used for running the reducer. */
+    private final boolean useNewReducer;
+
+    /** Flag is set if new context-object code is used for running the combiner. */
+    private final boolean useNewCombiner;
+
+    /** */
+    private final JobContextImpl jobCtx;
+
+    /** Set if task is to cancelling. */
+    private volatile boolean cancelled;
+
+    /** Current task. */
+    private volatile HadoopTask task;
+
+    /** Local node ID */
+    private final UUID locNodeId;
+
+    /** Counters for task. */
+    private final HadoopCounters cntrs = new HadoopCountersImpl();
+
+    /**
+     * @param taskInfo Task info.
+     * @param job Job.
+     * @param jobId Job ID.
+     * @param locNodeId Local node ID.
+     * @param jobConfDataInput DataInput for read JobConf.
+     */
+    public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId,
+        @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
+        super(taskInfo, job);
+        this.locNodeId = locNodeId;
+
+        // Before create JobConf instance we should set new context class loader.
+        ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
+
+        try {
+            JobConf jobConf = new JobConf();
+
+            try {
+                jobConf.readFields(jobConfDataInput);
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException(e);
+            }
+
+            // For map-reduce jobs prefer local writes.
+            jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
+
+            jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId()));
+
+            useNewMapper = jobConf.getUseNewMapper();
+            useNewReducer = jobConf.getUseNewReducer();
+            useNewCombiner = jobConf.getCombinerClass() == null;
+        }
+        finally {
+            HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
+        return cntrs.counter(grp, name, cls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopCounters counters() {
+        return cntrs;
+    }
+
+    /**
+     * Creates appropriate task from current task info.
+     *
+     * @return Task.
+     */
+    private HadoopTask createTask() {
+        boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT;
+
+        switch (taskInfo().type()) {
+            case SETUP:
+                return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo());
+
+            case MAP:
+                return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo());
+
+            case REDUCE:
+                return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) :
+                    new HadoopV1ReduceTask(taskInfo(), true);
+
+            case COMBINE:
+                return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) :
+                    new HadoopV1ReduceTask(taskInfo(), false);
+
+            case COMMIT:
+            case ABORT:
+                return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) :
+                    new HadoopV1CleanupTask(taskInfo(), isAbort);
+
+            default:
+                return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() throws IgniteCheckedException {
+        ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf().getClassLoader());
+
+        try {
+            try {
+                task = createTask();
+            }
+            catch (Throwable e) {
+                if (e instanceof Error)
+                    throw e;
+
+                throw transformException(e);
+            }
+
+            if (cancelled)
+                throw new HadoopTaskCancelledException("Task cancelled.");
+
+            try {
+                task.run(this);
+            }
+            catch (Throwable e) {
+                if (e instanceof Error)
+                    throw e;
+
+                throw transformException(e);
+            }
+        }
+        finally {
+            task = null;
+
+            HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        cancelled = true;
+
+        HadoopTask t = task;
+
+        if (t != null)
+            t.cancel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
+        File locDir;
+
+        switch(taskInfo().type()) {
+            case MAP:
+            case REDUCE:
+                job().prepareTaskEnvironment(taskInfo());
+
+                locDir = taskLocalDir(locNodeId, taskInfo());
+
+                break;
+
+            default:
+                locDir = jobLocalDir(locNodeId, taskInfo().jobId());
+        }
+
+        ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf().getClassLoader());
+
+        try {
+            FileSystem.get(jobConf());
+
+            LocalFileSystem locFs = FileSystem.getLocal(jobConf());
+
+            locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
+        }
+        catch (Throwable e) {
+            if (e instanceof Error)
+                throw (Error)e;
+
+            throw transformException(e);
+        }
+        finally {
+            HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
+        job().cleanupTaskEnvironment(taskInfo());
+    }
+
+    /**
+     * Creates Hadoop attempt ID.
+     *
+     * @return Attempt ID.
+     */
+    public TaskAttemptID attemptId() {
+        TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());
+
+        return new TaskAttemptID(tid, taskInfo().attempt());
+    }
+
+    /**
+     * @param type Task type.
+     * @return Hadoop task type.
+     */
+    private TaskType taskType(HadoopTaskType type) {
+        switch (type) {
+            case SETUP:
+                return TaskType.JOB_SETUP;
+            case MAP:
+            case COMBINE:
+                return TaskType.MAP;
+
+            case REDUCE:
+                return TaskType.REDUCE;
+
+            case COMMIT:
+            case ABORT:
+                return TaskType.JOB_CLEANUP;
+
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Gets job configuration of the task.
+     *
+     * @return Job configuration.
+     */
+    public JobConf jobConf() {
+        return jobCtx.getJobConf();
+    }
+
+    /**
+     * Gets job context of the task.
+     *
+     * @return Job context.
+     */
+    public JobContextImpl jobContext() {
+        return jobCtx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
+        Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null);
+
+        if (partClsOld != null)
+            return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf());
+
+        try {
+            return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf());
+        }
+        catch (ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Gets serializer for specified class.
+     *
+     * @param cls Class.
+     * @param jobConf Job configuration.
+     * @return Appropriate serializer.
+     */
+    @SuppressWarnings("unchecked")
+    private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
+        A.notNull(cls, "cls");
+
+        SerializationFactory factory = new SerializationFactory(jobConf);
+
+        Serialization<?> serialization = factory.getSerialization(cls);
+
+        if (serialization == null)
+            throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
+
+        if (serialization.getClass() == WritableSerialization.class)
+            return new HadoopWritableSerialization((Class<? extends Writable>)cls);
+
+        return new HadoopSerializationWrapper(serialization, cls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
+        return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
+        return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Comparator<Object> sortComparator() {
+        return (Comparator<Object>)jobCtx.getSortComparator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Comparator<Object> groupComparator() {
+        Comparator<?> res;
+
+        switch (taskInfo().type()) {
+            case COMBINE:
+                res = COMBINE_KEY_GROUPING_SUPPORTED ?
+                    jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator();
+
+                break;
+
+            case REDUCE:
+                res = jobContext().getGroupingComparator();
+
+                break;
+
+            default:
+                return null;
+        }
+
+        if (res != null && res.getClass() != sortComparator().getClass())
+            return (Comparator<Object>)res;
+
+        return null;
+    }
+
+    /**
+     * @param split Split.
+     * @return Native Hadoop split.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException {
+        if (split instanceof HadoopExternalSplit)
+            return readExternalSplit((HadoopExternalSplit)split);
+
+        if (split instanceof HadoopSplitWrapper)
+            return unwrapSplit((HadoopSplitWrapper)split);
+
+        throw new IllegalStateException("Unknown split: " + split);
+    }
+
+    /**
+     * @param split External split.
+     * @return Native input split.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
+        Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
+
+        FileSystem fs;
+
+        try {
+            // This assertion uses .startsWith() instead of .equals() because task class loaders may
+            // be reused between tasks of the same job.
+            assert ((HadoopClassLoader)getClass().getClassLoader()).name()
+                .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true));
+
+            // We also cache Fs there, all them will be cleared explicitly upon the Job end.
+            fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        try (
+            FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
+
+            in.seek(split.offset());
+
+            String clsName = Text.readString(in);
+
+            Class<?> cls = jobConf().getClassByName(clsName);
+
+            assert cls != null;
+
+            Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls);
+
+            Deserializer deserializer = serialization.getDeserializer(cls);
+
+            deserializer.open(in);
+
+            Object res = deserializer.deserialize(null);
+
+            deserializer.close();
+
+            assert res != null;
+
+            return res;
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
+        String user = job.info().user();
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        String ugiUser;
+
+        try {
+            UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+            assert currUser != null;
+
+            ugiUser = currUser.getShortUserName();
+        }
+        catch (IOException ioe) {
+            throw new IgniteCheckedException(ioe);
+        }
+
+        try {
+            if (F.eq(user, ugiUser))
+                // if current UGI context user is the same, do direct call:
+                return c.call();
+            else {
+                UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+                return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                    @Override public T run() throws Exception {
+                        return c.call();
+                    }
+                });
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
new file mode 100644
index 0000000..e612f1b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Optimized serialization for Hadoop {@link Writable} types.
+ */
+public class HadoopWritableSerialization implements HadoopSerialization {
+    /** */
+    private final Class<? extends Writable> cls;
+
+    /**
+     * @param cls Class.
+     */
+    public HadoopWritableSerialization(Class<? extends Writable> cls) {
+        assert cls != null;
+
+        this.cls = cls;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
+        assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass();
+
+        try {
+            ((Writable)obj).write(out);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
+        Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj);
+
+        try {
+            w.readFields(in);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        return w;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index f3e17f3..bffb82b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
 import org.apache.ignite.internal.processors.hadoop.HadoopContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
@@ -58,14 +59,12 @@ import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
 import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus;
 import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
 import org.apache.ignite.internal.util.GridMutex;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -153,23 +152,16 @@ public class HadoopJobTracker extends HadoopComponent {
 
         evtProcSvc = Executors.newFixedThreadPool(1);
 
-        UUID nodeId = ctx.localNodeId();
-
         assert jobCls == null;
 
-        String[] libNames = null;
-
-        if (ctx.configuration() != null)
-            libNames = ctx.configuration().getNativeLibraryNames();
-
-        HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId), libNames);
+        HadoopClassLoader ldr = ctx.kernalContext().hadoopHelper().commonClassLoader();
 
         try {
-            jobCls = (Class<HadoopV2Job>)ldr.loadClass(HadoopV2Job.class.getName());
+            jobCls = (Class<HadoopJob>)ldr.loadClass(HadoopCommonUtils.JOB_CLS_NAME);
         }
         catch (Exception ioe) {
-            throw new IgniteCheckedException("Failed to load job class [class="
-                + HadoopV2Job.class.getName() + ']', ioe);
+            throw new IgniteCheckedException("Failed to load job class [class=" +
+                HadoopCommonUtils.JOB_CLS_NAME + ']', ioe);
         }
     }
 
@@ -903,7 +895,7 @@ public class HadoopJobTracker extends HadoopComponent {
                     ClassLoader ldr = job.getClass().getClassLoader();
 
                     try {
-                        String statWriterClsName = job.info().property(HadoopUtils.JOB_COUNTER_WRITER_PROPERTY);
+                        String statWriterClsName = job.info().property(HadoopCommonUtils.JOB_COUNTER_WRITER_PROPERTY);
 
                         if (statWriterClsName != null) {
                             Class<?> cls = ldr.loadClass(statWriterClsName);
@@ -1060,7 +1052,8 @@ public class HadoopJobTracker extends HadoopComponent {
                 jobInfo = meta.jobInfo();
             }
 
-            job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames());
+            job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames(),
+                ctx.kernalContext().hadoopHelper());
 
             job.initialize(false, ctx.localNodeId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
index 15c62c8..7aaf3fa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.processors.hadoop.planner;
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
 /**
  * Map-reduce plan.
  */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
deleted file mode 100644
index 5f96e08..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.QueueAclsInfo;
-import org.apache.hadoop.mapreduce.QueueInfo;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.mapreduce.TaskReport;
-import org.apache.hadoop.mapreduce.TaskTrackerInfo;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.security.token.Token;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.client.GridClient;
-import org.apache.ignite.internal.client.GridClientException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceCounters;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_SUBMISSION_START_TS_PROPERTY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.REQ_NEW_JOBID_TS_PROPERTY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.RESPONSE_NEW_JOBID_TS_PROPERTY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
-
-/**
- * Hadoop client protocol.
- */
-public class HadoopClientProtocol implements ClientProtocol {
-    /** Protocol version. */
-    private static final long PROTO_VER = 1L;
-
-    /** Default Ignite system directory. */
-    private static final String SYS_DIR = ".ignite/system";
-
-    /** Configuration. */
-    private final Configuration conf;
-
-    /** Ignite client. */
-    private volatile GridClient cli;
-
-    /** Last received version. */
-    private long lastVer = -1;
-
-    /** Last received status. */
-    private HadoopJobStatus lastStatus;
-
-    /**
-     * Constructor.
-     *
-     * @param conf Configuration.
-     * @param cli Ignite client.
-     */
-    public HadoopClientProtocol(Configuration conf, GridClient cli) {
-        assert cli != null;
-
-        this.conf = conf;
-        this.cli = cli;
-    }
-
-    /** {@inheritDoc} */
-    @Override public JobID getNewJobID() throws IOException, InterruptedException {
-        try {
-            conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
-
-            HadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null);
-
-            conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
-
-            return new JobID(jobID.globalId().toString(), jobID.localId());
-        }
-        catch (GridClientException e) {
-            throw new IOException("Failed to get new job ID.", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException,
-        InterruptedException {
-        try {
-            conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
-
-            HadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(),
-                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)));
-
-            if (status == null)
-                throw new IOException("Failed to submit job (null status obtained): " + jobId);
-
-            return processStatus(status);
-        }
-        catch (GridClientException | IgniteCheckedException e) {
-            throw new IOException("Failed to submit job.", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
-        return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException {
-        return Cluster.JobTrackerStatus.RUNNING;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public AccessControlList getQueueAdmins(String queueName) throws IOException {
-        return new AccessControlList("*");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void killJob(JobID jobId) throws IOException, InterruptedException {
-        try {
-            cli.compute().execute(HadoopProtocolKillJobTask.class.getName(),
-                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
-        }
-        catch (GridClientException e) {
-            throw new IOException("Failed to kill job: " + jobId, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException,
-        InterruptedException {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException {
-        try {
-            Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1);
-
-            HadoopProtocolTaskArguments args = delay >= 0 ?
-                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) :
-                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId());
-
-            HadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args);
-
-            if (status == null)
-                throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
-
-            return processStatus(status);
-        }
-        catch (GridClientException e) {
-            throw new IOException("Failed to get job status: " + jobId, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException {
-        try {
-            final HadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(),
-                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
-
-            if (counters == null)
-                throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
-
-            return new HadoopMapReduceCounters(counters);
-        }
-        catch (GridClientException e) {
-            throw new IOException("Failed to get job counters: " + jobId, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException {
-        return new TaskReport[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getFilesystemName() throws IOException, InterruptedException {
-        return FileSystem.get(conf).getUri().toString();
-    }
-
-    /** {@inheritDoc} */
-    @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException {
-        return new JobStatus[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
-        throws IOException, InterruptedException {
-        return new TaskCompletionEvent[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException {
-        return new String[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException {
-        return new TaskTrackerInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException {
-        return new TaskTrackerInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getSystemDir() throws IOException, InterruptedException {
-        Path sysDir = new Path(SYS_DIR);
-
-        return sysDir.toString();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getStagingAreaDir() throws IOException, InterruptedException {
-        String usr = UserGroupInformation.getCurrentUser().getShortUserName();
-
-        return HadoopUtils.stagingAreaDir(conf, usr).toString();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getJobHistoryDir() throws IOException, InterruptedException {
-        return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public QueueInfo[] getQueues() throws IOException, InterruptedException {
-        return new QueueInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException {
-        return new QueueAclsInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
-        return new QueueInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException {
-        return new QueueInfo[0];
-    }
-
-    /** {@inheritDoc} */
-    @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException,
-        InterruptedException {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
-        InterruptedException {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
-        InterruptedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException,
-        InterruptedException {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
-        return PROTO_VER;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
-        throws IOException {
-        return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash);
-    }
-
-    /**
-     * Process received status update.
-     *
-     * @param status Ignite status.
-     * @return Hadoop status.
-     */
-    private JobStatus processStatus(HadoopJobStatus status) {
-        // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because
-        // IgniteHadoopClientProtocolProvider creates new instance of this class for every new job and Job class
-        // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will
-        // change in future and either protocol will serve statuses for several jobs or status update will not be
-        // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap).
-        // (vozerov)
-        if (lastVer < status.version()) {
-            lastVer = status.version();
-
-            lastStatus = status;
-        }
-        else
-            assert lastStatus != null;
-
-        return HadoopUtils.status(lastStatus, conf);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index 4a946e9..45d9a27 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -21,8 +21,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -44,7 +47,6 @@ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.Hadoop
 import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopTaskFinishedMessage;
 import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopExternalCommunication;
 import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopMessageListener;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
@@ -126,6 +128,7 @@ public class HadoopChildProcessRunner {
      *
      * @param req Initialization request.
      */
+    @SuppressWarnings("unchecked")
     private void prepareProcess(HadoopPrepareForJobRequest req) {
         if (initGuard.compareAndSet(false, true)) {
             try {
@@ -134,7 +137,16 @@ public class HadoopChildProcessRunner {
 
                 assert job == null;
 
-                job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log, null);
+                Class jobCls;
+
+                try {
+                    jobCls = Class.forName(HadoopCommonUtils.JOB_CLS_NAME);
+                }
+                catch (ClassNotFoundException e) {
+                    throw new IgniteException("Failed to load job class: " + HadoopCommonUtils.JOB_CLS_NAME, e);
+                }
+
+                job = req.jobInfo().createJob(jobCls, req.jobId(), log, null, new HadoopHelperImpl());
 
                 job.initialize(true, nodeDesc.processId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
deleted file mode 100644
index 750b314..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import java.io.IOException;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
-
-/**
- * Hadoop cleanup task implementation for v1 API.
- */
-public class HadoopV1CleanupTask extends HadoopV1Task {
-    /** Abort flag. */
-    private final boolean abort;
-
-    /**
-     * @param taskInfo Task info.
-     * @param abort Abort flag.
-     */
-    public HadoopV1CleanupTask(HadoopTaskInfo taskInfo, boolean abort) {
-        super(taskInfo);
-
-        this.abort = abort;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
-
-        JobContext jobCtx = ctx.jobContext();
-
-        try {
-            OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter();
-
-            if (abort)
-                committer.abortJob(jobCtx, JobStatus.State.FAILED);
-            else
-                committer.commitJob(jobCtx);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
deleted file mode 100644
index c623eab..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter;
-
-import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
-
-/**
- * Hadoop counter implementation for v1 API.
- */
-public class HadoopV1Counter extends Counters.Counter {
-    /** Delegate. */
-    private final HadoopLongCounter cntr;
-
-    /**
-     * Creates new instance.
-     *
-     * @param cntr Delegate counter.
-     */
-    public HadoopV1Counter(HadoopLongCounter cntr) {
-        this.cntr = cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setDisplayName(String displayName) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getName() {
-        return cntr.name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getDisplayName() {
-        return getName();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getValue() {
-        return cntr.value();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setValue(long val) {
-        cntr.value(val);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void increment(long incr) {
-        cntr.increment(incr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(DataOutput out) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readFields(DataInput in) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-
-    /** {@inheritDoc} */
-    @Override public String makeEscapedCompactString() {
-        return toEscapedCompactString(new HadoopV2Counter(cntr));
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("deprecation")
-    @Override public boolean contentEquals(Counters.Counter cntr) {
-        return getUnderlyingCounter().equals(cntr.getUnderlyingCounter());
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getCounter() {
-        return cntr.value();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter getUnderlyingCounter() {
-        return this;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
deleted file mode 100644
index fb2266a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
-
-/**
- * Hadoop map task implementation for v1 API.
- */
-public class HadoopV1MapTask extends HadoopV1Task {
-    /** */
-    private static final String[] EMPTY_HOSTS = new String[0];
-
-    /**
-     * Constructor.
-     *
-     * @param taskInfo 
-     */
-    public HadoopV1MapTask(HadoopTaskInfo taskInfo) {
-        super(taskInfo);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-        HadoopJob job = taskCtx.job();
-
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
-
-        JobConf jobConf = ctx.jobConf();
-
-        InputFormat inFormat = jobConf.getInputFormat();
-
-        HadoopInputSplit split = info().inputSplit();
-
-        InputSplit nativeSplit;
-
-        if (split instanceof HadoopFileBlock) {
-            HadoopFileBlock block = (HadoopFileBlock)split;
-
-            nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
-        }
-        else
-            nativeSplit = (InputSplit)ctx.getNativeSplit(split);
-
-        assert nativeSplit != null;
-
-        Reporter reporter = new HadoopV1Reporter(taskCtx);
-
-        HadoopV1OutputCollector collector = null;
-
-        try {
-            collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
-                fileName(), ctx.attemptId());
-
-            RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
-
-            Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
-
-            Object key = reader.createKey();
-            Object val = reader.createValue();
-
-            assert mapper != null;
-
-            try {
-                try {
-                    while (reader.next(key, val)) {
-                        if (isCancelled())
-                            throw new HadoopTaskCancelledException("Map task cancelled.");
-
-                        mapper.map(key, val, collector, reporter);
-                    }
-                }
-                finally {
-                    mapper.close();
-                }
-            }
-            finally {
-                collector.closeWriter();
-            }
-
-            collector.commit();
-        }
-        catch (Exception e) {
-            if (collector != null)
-                collector.abort();
-
-            throw new IgniteCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
deleted file mode 100644
index 37f81a6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import java.io.IOException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptContextImpl;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop output collector.
- */
-public class HadoopV1OutputCollector implements OutputCollector {
-    /** Job configuration. */
-    private final JobConf jobConf;
-
-    /** Task context. */
-    private final HadoopTaskContext taskCtx;
-
-    /** Optional direct writer. */
-    private final RecordWriter writer;
-
-    /** Task attempt. */
-    private final TaskAttemptID attempt;
-
-    /**
-     * @param jobConf Job configuration.
-     * @param taskCtx Task context.
-     * @param directWrite Direct write flag.
-     * @param fileName File name.
-     * @throws IOException In case of IO exception.
-     */
-    HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite,
-        @Nullable String fileName, TaskAttemptID attempt) throws IOException {
-        this.jobConf = jobConf;
-        this.taskCtx = taskCtx;
-        this.attempt = attempt;
-
-        if (directWrite) {
-            jobConf.set("mapreduce.task.attempt.id", attempt.toString());
-
-            OutputFormat outFormat = jobConf.getOutputFormat();
-
-            writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
-        }
-        else
-            writer = null;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void collect(Object key, Object val) throws IOException {
-        if (writer != null)
-            writer.write(key, val);
-        else {
-            try {
-                taskCtx.output().write(key, val);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    /**
-     * Close writer.
-     *
-     * @throws IOException In case of IO exception.
-     */
-    public void closeWriter() throws IOException {
-        if (writer != null)
-            writer.close(Reporter.NULL);
-    }
-
-    /**
-     * Setup task.
-     *
-     * @throws IOException If failed.
-     */
-    public void setup() throws IOException {
-        if (writer != null)
-            jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt));
-    }
-
-    /**
-     * Commit task.
-     *
-     * @throws IOException In failed.
-     */
-    public void commit() throws IOException {
-        if (writer != null) {
-            OutputCommitter outputCommitter = jobConf.getOutputCommitter();
-
-            TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt);
-
-            if (outputCommitter.needsTaskCommit(taskCtx))
-                outputCommitter.commitTask(taskCtx);
-        }
-    }
-
-    /**
-     * Abort task.
-     */
-    public void abort() {
-        try {
-            if (writer != null)
-                jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt));
-        }
-        catch (IOException ignore) {
-            // No-op.
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
deleted file mode 100644
index 0ab1bba..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
-
-/**
- * Hadoop partitioner adapter for v1 API.
- */
-public class HadoopV1Partitioner implements HadoopPartitioner {
-    /** Partitioner instance. */
-    private Partitioner<Object, Object> part;
-
-    /**
-     * @param cls Hadoop partitioner class.
-     * @param conf Job configuration.
-     */
-    public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) {
-        part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int partition(Object key, Object val, int parts) {
-        return part.getPartition(key, val, parts);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
deleted file mode 100644
index e656695..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
-
-/**
- * Hadoop reduce task implementation for v1 API.
- */
-public class HadoopV1ReduceTask extends HadoopV1Task {
-    /** {@code True} if reduce, {@code false} if combine. */
-    private final boolean reduce;
-
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     * @param reduce {@code True} if reduce, {@code false} if combine.
-     */
-    public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
-        super(taskInfo);
-
-        this.reduce = reduce;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-        HadoopJob job = taskCtx.job();
-
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
-
-        JobConf jobConf = ctx.jobConf();
-
-        HadoopTaskInput input = taskCtx.input();
-
-        HadoopV1OutputCollector collector = null;
-
-        try {
-            collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
-
-            Reducer reducer;
-            if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
-                jobConf);
-            else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
-                jobConf);
-
-            assert reducer != null;
-
-            try {
-                try {
-                    while (input.next()) {
-                        if (isCancelled())
-                            throw new HadoopTaskCancelledException("Reduce task cancelled.");
-
-                        reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
-                    }
-                }
-                finally {
-                    reducer.close();
-                }
-            }
-            finally {
-                collector.closeWriter();
-            }
-
-            collector.commit();
-        }
-        catch (Exception e) {
-            if (collector != null)
-                collector.abort();
-
-            throw new IgniteCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
deleted file mode 100644
index 5a63aab..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
-
-/**
- * Hadoop reporter implementation for v1 API.
- */
-public class HadoopV1Reporter implements Reporter {
-    /** Context. */
-    private final HadoopTaskContext ctx;
-
-    /**
-     * Creates new instance.
-     *
-     * @param ctx Context.
-     */
-    public HadoopV1Reporter(HadoopTaskContext ctx) {
-        this.ctx = ctx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setStatus(String status) {
-        // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counters.Counter getCounter(Enum<?> name) {
-        return getCounter(name.getDeclaringClass().getName(), name.name());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counters.Counter getCounter(String grp, String name) {
-        return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void incrCounter(Enum<?> key, long amount) {
-        getCounter(key).increment(amount);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void incrCounter(String grp, String cntr, long amount) {
-        getCounter(grp, cntr).increment(amount);
-    }
-
-    /** {@inheritDoc} */
-    @Override public InputSplit getInputSplit() throws UnsupportedOperationException {
-        throw new UnsupportedOperationException("reporter has no input"); // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public float getProgress() {
-        return 0.5f; // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public void progress() {
-        // TODO
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
deleted file mode 100644
index d2f6823..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import java.io.IOException;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
-
-/**
- * Hadoop setup task implementation for v1 API.
- */
-public class HadoopV1SetupTask extends HadoopV1Task {
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     */
-    public HadoopV1SetupTask(HadoopTaskInfo taskInfo) {
-        super(taskInfo);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
-
-        try {
-            ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf());
-
-            OutputCommitter committer = ctx.jobConf().getOutputCommitter();
-
-            if (committer != null)
-                committer.setupJob(ctx.jobContext());
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
deleted file mode 100644
index 203def4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop API v1 splitter.
- */
-public class HadoopV1Splitter {
-    /** */
-    private static final String[] EMPTY_HOSTS = {};
-
-    /**
-     * @param jobConf Job configuration.
-     * @return Collection of mapped splits.
-     * @throws IgniteCheckedException If mapping failed.
-     */
-    public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException {
-        try {
-            InputFormat<?, ?> format = jobConf.getInputFormat();
-
-            assert format != null;
-
-            InputSplit[] splits = format.getSplits(jobConf, 0);
-
-            Collection<HadoopInputSplit> res = new ArrayList<>(splits.length);
-
-            for (int i = 0; i < splits.length; i++) {
-                InputSplit nativeSplit = splits[i];
-
-                if (nativeSplit instanceof FileSplit) {
-                    FileSplit s = (FileSplit)nativeSplit;
-
-                    res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
-                }
-                else
-                    res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
-            }
-
-            return res;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /**
-     * @param clsName Input split class name.
-     * @param in Input stream.
-     * @param hosts Optional hosts.
-     * @return File block or {@code null} if it is not a {@link FileSplit} instance.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in,
-        @Nullable String[] hosts) throws IgniteCheckedException {
-        if (!FileSplit.class.getName().equals(clsName))
-            return null;
-
-        FileSplit split = U.newInstance(FileSplit.class);
-
-        try {
-            split.readFields(in);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        if (hosts == null)
-            hosts = EMPTY_HOSTS;
-
-        return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
deleted file mode 100644
index a89323c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.ignite.internal.processors.hadoop.HadoopTask;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Extended Hadoop v1 task.
- */
-public abstract class HadoopV1Task extends HadoopTask {
-    /** Indicates that this task is to be cancelled. */
-    private volatile boolean cancelled;
-
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     */
-    protected HadoopV1Task(HadoopTaskInfo taskInfo) {
-        super(taskInfo);
-    }
-
-    /**
-     * Gets file name for that task result.
-     *
-     * @return File name.
-     */
-    public String fileName() {
-        NumberFormat numFormat = NumberFormat.getInstance();
-
-        numFormat.setMinimumIntegerDigits(5);
-        numFormat.setGroupingUsed(false);
-
-        return "part-" + numFormat.format(info().taskNumber());
-    }
-
-    /**
-     *
-     * @param jobConf Job configuration.
-     * @param taskCtx Task context.
-     * @param directWrite Direct write flag.
-     * @param fileName File name.
-     * @param attempt Attempt of task.
-     * @return Collector.
-     * @throws IOException In case of IO exception.
-     */
-    protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx,
-        boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException {
-        HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite,
-            fileName, attempt) {
-            /** {@inheritDoc} */
-            @Override public void collect(Object key, Object val) throws IOException {
-                if (cancelled)
-                    throw new HadoopTaskCancelledException("Task cancelled.");
-
-                super.collect(key, val);
-            }
-        };
-
-        collector.setup();
-
-        return collector;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancel() {
-        cancelled = true;
-    }
-
-    /** Returns true if task is cancelled. */
-    public boolean isCancelled() {
-        return cancelled;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
deleted file mode 100644
index 9632525..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopDaemon.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.util.Collection;
-import java.util.LinkedList;
-
-/**
- * Replacement for Hadoop {@code org.apache.hadoop.util.Daemon} class.
- */
-@SuppressWarnings("UnusedDeclaration")
-public class HadoopDaemon extends Thread {
-    /** Lock object used for synchronization. */
-    private static final Object lock = new Object();
-
-    /** Collection to hold the threads to be stopped. */
-    private static Collection<HadoopDaemon> daemons = new LinkedList<>();
-
-    {
-        setDaemon(true); // always a daemon
-    }
-
-    /** Runnable of this thread, may be this. */
-    final Runnable runnable;
-
-    /**
-     * Construct a daemon thread.
-     */
-    public HadoopDaemon() {
-        super();
-
-        runnable = this;
-
-        enqueueIfNeeded();
-    }
-
-    /**
-     * Construct a daemon thread.
-     */
-    public HadoopDaemon(Runnable runnable) {
-        super(runnable);
-
-        this.runnable = runnable;
-
-        this.setName(runnable.toString());
-
-        enqueueIfNeeded();
-    }
-
-    /**
-     * Construct a daemon thread to be part of a specified thread group.
-     */
-    public HadoopDaemon(ThreadGroup grp, Runnable runnable) {
-        super(grp, runnable);
-
-        this.runnable = runnable;
-
-        this.setName(runnable.toString());
-
-        enqueueIfNeeded();
-    }
-
-    /**
-     * Getter for the runnable. May return this.
-     *
-     * @return the runnable
-     */
-    public Runnable getRunnable() {
-        return runnable;
-    }
-
-    /**
-     * if the runnable is a Hadoop org.apache.hadoop.hdfs.PeerCache Runnable.
-     *
-     * @param r the runnable.
-     * @return true if it is.
-     */
-    private static boolean isPeerCacheRunnable(Runnable r) {
-        String name = r.getClass().getName();
-
-        return name.startsWith("org.apache.hadoop.hdfs.PeerCache");
-    }
-
-    /**
-     * Enqueue this thread if it should be stopped upon the task end.
-     */
-    private void enqueueIfNeeded() {
-        synchronized (lock) {
-            if (daemons == null)
-                throw new RuntimeException("Failed to create HadoopDaemon (its registry is already cleared): " +
-                    "[classLoader=" + getClass().getClassLoader() + ']');
-
-            if (runnable.getClass().getClassLoader() == getClass().getClassLoader() && isPeerCacheRunnable(runnable))
-                daemons.add(this);
-        }
-    }
-
-    /**
-     * Stops all the registered threads.
-     */
-    public static void dequeueAndStopAll() {
-        synchronized (lock) {
-            if (daemons != null) {
-                for (HadoopDaemon daemon : daemons)
-                    daemon.interrupt();
-
-                daemons = null;
-            }
-        }
-    }
-}
\ No newline at end of file


Mime
View raw message