ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [20/51] [partial] ignite git commit: IGNITE-3916: Created separate module.
Date Mon, 19 Sep 2016 10:50:50 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
deleted file mode 100644
index bd8ed2d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ /dev/null
@@ -1,1076 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.fs.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.AbstractFileSystem;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.FsStatus;
-import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.Progressable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsMode;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
-import org.apache.ignite.internal.processors.igfs.IgfsPaths;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
-import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
-import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
-import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
-
-/**
- * {@code IGFS} Hadoop 2.x file system driver over file system API. To use
- * {@code IGFS} as Hadoop file system, you should configure this class
- * in Hadoop's {@code core-site.xml} as follows:
- * <pre name="code" class="xml">
- *  &lt;property&gt;
- *      &lt;name&gt;fs.default.name&lt;/name&gt;
- *      &lt;value&gt;igfs://ipc&lt;/value&gt;
- *  &lt;/property&gt;
- *
- *  &lt;property&gt;
- *      &lt;name&gt;fs.igfs.impl&lt;/name&gt;
- *      &lt;value&gt;org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem&lt;/value&gt;
- *  &lt;/property&gt;
- * </pre>
- * You should also add Ignite JAR and all libraries to Hadoop classpath. To
- * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop
- * distribution:
- * <pre name="code" class="bash">
- * export IGNITE_HOME=/path/to/Ignite/distribution
- * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
- *
- * for f in $IGNITE_HOME/libs/*.jar; do
- *  export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
- * done
- * </pre>
- * <h1 class="header">Data vs Clients Nodes</h1>
- * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on
- * data nodes. Client nodes are responsible for basic file system operations as well as
- * accessing data nodes remotely. Usually, client nodes are started together
- * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually
- * started together with Hadoop {@code task-tracker} processes.
- * <p>
- * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml}
- * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation.
- */
-public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closeable {
-    /** Logger. */
-    private static final Log LOG = LogFactory.getLog(IgniteHadoopFileSystem.class);
-
-    /** Ensures that close routine is invoked at most once. */
-    private final AtomicBoolean closeGuard = new AtomicBoolean();
-
-    /** Grid remote client. */
-    private HadoopIgfsWrapper rmtClient;
-
-    /** The name of the user this File System created on behalf of. */
-    private final String user;
-
-    /** Working directory. */
-    private IgfsPath workingDir;
-
-    /** URI. */
-    private final URI uri;
-
-    /** Authority. */
-    private String uriAuthority;
-
-    /** Client logger. */
-    private IgfsLogger clientLog;
-
-    /** Server block size. */
-    private long grpBlockSize;
-
-    /** Default replication factor. */
-    private short dfltReplication;
-
-    /** Secondary URI string. */
-    private URI secondaryUri;
-
-    /** Mode resolver. */
-    private IgfsModeResolver modeRslvr;
-
-    /** The secondary file system factory. */
-    private HadoopFileSystemFactory factory;
-
-    /** Whether custom sequential reads before prefetch value is provided. */
-    private boolean seqReadsBeforePrefetchOverride;
-
-    /** Custom-provided sequential reads before prefetch. */
-    private int seqReadsBeforePrefetch;
-
-    /** Flag that controls whether file writes should be colocated on data node. */
-    private boolean colocateFileWrites;
-
-    /** Prefer local writes. */
-    private boolean preferLocFileWrites;
-
-    /**
-     * @param name URI for file system.
-     * @param cfg Configuration.
-     * @throws URISyntaxException if name has invalid syntax.
-     * @throws IOException If initialization failed.
-     */
-    public IgniteHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException {
-        super(HadoopIgfsEndpoint.normalize(name), IGFS_SCHEME, false, -1);
-
-        uri = name;
-
-        user = getFsHadoopUser();
-
-        try {
-            initialize(name, cfg);
-        }
-        catch (IOException e) {
-            // Close client if exception occurred.
-            if (rmtClient != null)
-                rmtClient.close(false);
-
-            throw e;
-        }
-
-        workingDir = new IgfsPath("/user/" + user);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void checkPath(Path path) {
-        URI uri = path.toUri();
-
-        if (uri.isAbsolute()) {
-            if (!F.eq(uri.getScheme(), IGFS_SCHEME))
-                throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" +
-                    uri.getAuthority() + ']');
-
-            if (!F.eq(uri.getAuthority(), uriAuthority))
-                throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" +
-                    uri.getAuthority() + ']');
-        }
-    }
-
-    /**
-     * Public setter that can be used by direct users of FS or Visor.
-     *
-     * @param colocateFileWrites Whether all ongoing file writes should be colocated.
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    public void colocateFileWrites(boolean colocateFileWrites) {
-        this.colocateFileWrites = colocateFileWrites;
-    }
-
-    /**
-     * Enter busy state.
-     *
-     * @throws IOException If file system is stopped.
-     */
-    private void enterBusy() throws IOException {
-        if (closeGuard.get())
-            throw new IOException("File system is stopped.");
-    }
-
-    /**
-     * Leave busy state.
-     */
-    private void leaveBusy() {
-        // No-op.
-    }
-
-    /**
-     * @param name URI passed to constructor.
-     * @param cfg Configuration passed to constructor.
-     * @throws IOException If initialization failed.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private void initialize(URI name, Configuration cfg) throws IOException {
-        enterBusy();
-
-        try {
-            if (rmtClient != null)
-                throw new IOException("File system is already initialized: " + rmtClient);
-
-            A.notNull(name, "name");
-            A.notNull(cfg, "cfg");
-
-            if (!IGFS_SCHEME.equals(name.getScheme()))
-                throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME +
-                    "://[name]/[optional_path], actual=" + name + ']');
-
-            uriAuthority = name.getAuthority();
-
-            // Override sequential reads before prefetch if needed.
-            seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
-
-            if (seqReadsBeforePrefetch > 0)
-                seqReadsBeforePrefetchOverride = true;
-
-            // In Ignite replication factor is controlled by data cache affinity.
-            // We use replication factor to force the whole file to be stored on local node.
-            dfltReplication = (short)cfg.getInt("dfs.replication", 3);
-
-            // Get file colocation control flag.
-            colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false);
-            preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false);
-
-            // Get log directory.
-            String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR);
-
-            File logDirFile = U.resolveIgnitePath(logDirCfg);
-
-            String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
-
-            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
-
-            // Handshake.
-            IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
-
-            grpBlockSize = handshake.blockSize();
-
-            IgfsPaths paths = handshake.secondaryPaths();
-
-            Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false);
-
-            if (handshake.sampling() != null ? handshake.sampling() : logEnabled) {
-                // Initiate client logger.
-                if (logDir == null)
-                    throw new IOException("Failed to resolve log directory: " + logDirCfg);
-
-                Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE);
-
-                clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize);
-            }
-            else
-                clientLog = IgfsLogger.disabledLogger();
-
-            try {
-                modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes());
-            }
-            catch (IgniteCheckedException ice) {
-                throw new IOException(ice);
-            }
-
-            boolean initSecondary = paths.defaultMode() == PROXY;
-
-            if (!initSecondary && paths.pathModes() != null) {
-                for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
-                    IgfsMode mode = pathMode.getValue();
-
-                    if (mode == PROXY) {
-                        initSecondary = true;
-
-                        break;
-                    }
-                }
-            }
-
-            if (initSecondary) {
-                try {
-                    factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IOException("Failed to get secondary file system factory.", e);
-                }
-
-                if (factory == null)
-                    throw new IOException("Failed to get secondary file system factory (did you set " +
-                        IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " +
-                        FileSystemConfiguration.class.getName() + "?)");
-
-                assert factory != null;
-
-                if (factory instanceof LifecycleAware)
-                    ((LifecycleAware) factory).start();
-
-                try {
-                    FileSystem secFs = factory.get(user);
-
-                    secondaryUri = secFs.getUri();
-
-                    A.ensure(secondaryUri != null, "Secondary file system uri should not be null.");
-                }
-                catch (IOException e) {
-                    throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e);
-                }
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IOException {
-        if (closeGuard.compareAndSet(false, true)) {
-            if (rmtClient == null)
-                return;
-
-            rmtClient.close(false);
-
-            if (clientLog.isLogEnabled())
-                clientLog.close();
-
-            if (factory instanceof LifecycleAware)
-                ((LifecycleAware) factory).stop();
-
-            // Reset initialized resources.
-            rmtClient = null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public URI getUri() {
-        return uri;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getUriDefaultPort() {
-        return -1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FsServerDefaults getServerDefaults() throws IOException {
-        return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, (int)grpBlockSize, dfltReplication, 64 * 1024,
-            false, 0, DataChecksum.Type.NULL);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean setReplication(Path f, short replication) throws IOException {
-        return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setTimes(Path f, long mtime, long atime) throws IOException {
-        if (mode(f) == PROXY)
-            secondaryFileSystem().setTimes(f, mtime, atime);
-        else {
-            if (mtime == -1 && atime == -1)
-                return;
-
-            rmtClient.setTimes(convert(f), atime, mtime);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public FsStatus getFsStatus() throws IOException {
-        IgfsStatus status = rmtClient.fsStatus();
-
-        return new FsStatus(status.spaceTotal(), status.spaceUsed(), status.spaceTotal() - status.spaceUsed());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setPermission(Path p, FsPermission perm) throws IOException {
-        enterBusy();
-
-        try {
-            A.notNull(p, "p");
-
-            if (mode(p) == PROXY)
-                secondaryFileSystem().setPermission(toSecondary(p), perm);
-            else {
-                if (rmtClient.update(convert(p), permission(perm)) == null)
-                    throw new IOException("Failed to set file permission (file not found?)" +
-                        " [path=" + p + ", perm=" + perm + ']');
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setOwner(Path p, String usr, String grp) throws IOException {
-        A.notNull(p, "p");
-        A.notNull(usr, "username");
-        A.notNull(grp, "grpName");
-
-        enterBusy();
-
-        try {
-            if (mode(p) == PROXY)
-                secondaryFileSystem().setOwner(toSecondary(p), usr, grp);
-            else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, usr,
-                IgfsUtils.PROP_GROUP_NAME, grp)) == null) {
-                throw new IOException("Failed to set file permission (file not found?)" +
-                    " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataInputStream open(Path f, int bufSize) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (mode == PROXY) {
-                FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize);
-
-                if (clientLog.isLogEnabled()) {
-                    // At this point we do not know file size, so we perform additional request to remote FS to get it.
-                    FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f));
-
-                    long size = status != null ? status.getLen() : -1;
-
-                    long logId = IgfsLogger.nextId();
-
-                    clientLog.logOpen(logId, path, PROXY, bufSize, size);
-
-                    return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId));
-                }
-                else
-                    return is;
-            }
-            else {
-                HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ?
-                    rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path);
-
-                long logId = -1;
-
-                if (clientLog.isLogEnabled()) {
-                    logId = IgfsLogger.nextId();
-
-                    clientLog.logOpen(logId, path, mode, bufSize, stream.length());
-                }
-
-                if (LOG.isDebugEnabled())
-                    LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path +
-                        ", bufSize=" + bufSize + ']');
-
-                HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(),
-                    bufSize, LOG, clientLog, logId);
-
-                if (LOG.isDebugEnabled())
-                    LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']');
-
-                return new FSDataInputStream(igfsIn);
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("deprecation")
-    @Override public FSDataOutputStream createInternal(
-        Path f,
-        EnumSet<CreateFlag> flag,
-        FsPermission perm,
-        int bufSize,
-        short replication,
-        long blockSize,
-        Progressable progress,
-        Options.ChecksumOpt checksumOpt,
-        boolean createParent
-    ) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
-        boolean append = flag.contains(CreateFlag.APPEND);
-        boolean create = flag.contains(CreateFlag.CREATE);
-
-        OutputStream out = null;
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (LOG.isDebugEnabled())
-                LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" +
-                    path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
-
-            if (mode == PROXY) {
-                FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize,
-                    replication, blockSize, progress);
-
-                if (clientLog.isLogEnabled()) {
-                    long logId = IgfsLogger.nextId();
-
-                    if (append)
-                        clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID.
-                    else
-                        clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize);
-
-                    return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId));
-                }
-                else
-                    return os;
-            }
-            else {
-                Map<String, String> permMap = F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm),
-                    IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
-
-                // Create stream and close it in the 'finally' section if any sequential operation failed.
-                HadoopIgfsStreamDelegate stream;
-
-                long logId = -1;
-
-                if (append) {
-                    stream = rmtClient.append(path, create, permMap);
-
-                    if (clientLog.isLogEnabled()) {
-                        logId = IgfsLogger.nextId();
-
-                        clientLog.logAppend(logId, path, mode, bufSize);
-                    }
-
-                    if (LOG.isDebugEnabled())
-                        LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']');
-                }
-                else {
-                    stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize,
-                        permMap);
-
-                    if (clientLog.isLogEnabled()) {
-                        logId = IgfsLogger.nextId();
-
-                        clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize);
-                    }
-
-                    if (LOG.isDebugEnabled())
-                        LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']');
-                }
-
-                assert stream != null;
-
-                HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG,
-                    clientLog, logId);
-
-                bufSize = Math.max(64 * 1024, bufSize);
-
-                out = new BufferedOutputStream(igfsOut, bufSize);
-
-                FSDataOutputStream res = new FSDataOutputStream(out, null, 0);
-
-                // Mark stream created successfully.
-                out = null;
-
-                return res;
-            }
-        }
-        finally {
-            // Close if failed during stream creation.
-            if (out != null)
-                U.closeQuiet(out);
-
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean supportsSymlinks() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void renameInternal(Path src, Path dst) throws IOException {
-        A.notNull(src, "src");
-        A.notNull(dst, "dst");
-
-        enterBusy();
-
-        try {
-            IgfsPath srcPath = convert(src);
-            IgfsPath dstPath = convert(dst);
-
-            IgfsMode srcMode = modeRslvr.resolveMode(srcPath);
-
-            if (clientLog.isLogEnabled())
-                clientLog.logRename(srcPath, srcMode, dstPath);
-
-            if (srcMode == PROXY)
-                secondaryFileSystem().rename(toSecondary(src), toSecondary(dst));
-            else
-                rmtClient.rename(srcPath, dstPath);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean delete(Path f, boolean recursive) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (mode == PROXY) {
-                if (clientLog.isLogEnabled())
-                    clientLog.logDelete(path, PROXY, recursive);
-
-                return secondaryFileSystem().delete(toSecondary(f), recursive);
-            }
-
-            boolean res = rmtClient.delete(path, recursive);
-
-            if (clientLog.isLogEnabled())
-                clientLog.logDelete(path, mode, recursive);
-
-            return res;
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException {
-        // Checksum has effect for secondary FS only.
-        if (factory != null)
-            secondaryFileSystem().setVerifyChecksum(verifyChecksum);
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileChecksum getFileChecksum(Path f) throws IOException {
-        if (mode(f) == PROXY)
-            return secondaryFileSystem().getFileChecksum(f);
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus[] listStatus(Path f) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (mode == PROXY) {
-                FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f));
-
-                if (arr == null)
-                    throw new FileNotFoundException("File " + f + " does not exist.");
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = toPrimary(arr[i]);
-
-                if (clientLog.isLogEnabled()) {
-                    String[] fileArr = new String[arr.length];
-
-                    for (int i = 0; i < arr.length; i++)
-                        fileArr[i] = arr[i].getPath().toString();
-
-                    clientLog.logListDirectory(path, PROXY, fileArr);
-                }
-
-                return arr;
-            }
-            else {
-                Collection<IgfsFile> list = rmtClient.listFiles(path);
-
-                if (list == null)
-                    throw new FileNotFoundException("File " + f + " does not exist.");
-
-                List<IgfsFile> files = new ArrayList<>(list);
-
-                FileStatus[] arr = new FileStatus[files.size()];
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = convert(files.get(i));
-
-                if (clientLog.isLogEnabled()) {
-                    String[] fileArr = new String[arr.length];
-
-                    for (int i = 0; i < arr.length; i++)
-                        fileArr[i] = arr[i].getPath().toString();
-
-                    clientLog.logListDirectory(path, mode, fileArr);
-                }
-
-                return arr;
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void mkdir(Path f, FsPermission perm, boolean createParent) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (mode == PROXY) {
-                if (clientLog.isLogEnabled())
-                    clientLog.logMakeDirectory(path, PROXY);
-
-                secondaryFileSystem().mkdirs(toSecondary(f), perm);
-            }
-            else {
-                rmtClient.mkdirs(path, permission(perm));
-
-                if (clientLog.isLogEnabled())
-                    clientLog.logMakeDirectory(path, mode);
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileStatus(Path f) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            if (mode(f) == PROXY)
-                return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f)));
-            else {
-                IgfsFile info = rmtClient.info(convert(f));
-
-                if (info == null)
-                    throw new FileNotFoundException("File not found: " + f);
-
-                return convert(info);
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException {
-        A.notNull(path, "path");
-
-        IgfsPath igfsPath = convert(path);
-
-        enterBusy();
-
-        try {
-            if (modeRslvr.resolveMode(igfsPath) == PROXY)
-                return secondaryFileSystem().getFileBlockLocations(path, start, len);
-            else {
-                long now = System.currentTimeMillis();
-
-                List<IgfsBlockLocation> affinity = new ArrayList<>(
-                    rmtClient.affinity(igfsPath, start, len));
-
-                BlockLocation[] arr = new BlockLocation[affinity.size()];
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = convert(affinity.get(i));
-
-                if (LOG.isDebugEnabled())
-                    LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" +
-                        (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']');
-
-                return arr;
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Resolve path mode.
-     *
-     * @param path HDFS path.
-     * @return Path mode.
-     */
-    public IgfsMode mode(Path path) {
-        return modeRslvr.resolveMode(convert(path));
-    }
-
-    /**
-     * Convert the given path to path acceptable by the primary file system.
-     *
-     * @param path Path.
-     * @return Primary file system path.
-     */
-    private Path toPrimary(Path path) {
-        return convertPath(path, getUri());
-    }
-
-    /**
-     * Convert the given path to path acceptable by the secondary file system.
-     *
-     * @param path Path.
-     * @return Secondary file system path.
-     */
-    private Path toSecondary(Path path) {
-        assert factory != null;
-        assert secondaryUri != null;
-
-        return convertPath(path, secondaryUri);
-    }
-
-    /**
-     * Convert path using the given new URI.
-     *
-     * @param path Old path.
-     * @param newUri New URI.
-     * @return New path.
-     */
-    private Path convertPath(Path path, URI newUri) {
-        assert newUri != null;
-
-        if (path != null) {
-            URI pathUri = path.toUri();
-
-            try {
-                return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null,
-                    pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null));
-            }
-            catch (URISyntaxException e) {
-                throw new IgniteException("Failed to construct secondary file system path from the primary file " +
-                    "system path: " + path, e);
-            }
-        }
-        else
-            return null;
-    }
-
-    /**
-     * Convert a file status obtained from the secondary file system to a status of the primary file system.
-     *
-     * @param status Secondary file system status.
-     * @return Primary file system status.
-     */
-    private FileStatus toPrimary(FileStatus status) {
-        return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(),
-            status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(),
-            status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null;
-    }
-
-    /**
-     * Convert IGFS path into Hadoop path.
-     *
-     * @param path IGFS path.
-     * @return Hadoop path.
-     */
-    private Path convert(IgfsPath path) {
-        return new Path(IGFS_SCHEME, uriAuthority, path.toString());
-    }
-
-    /**
-     * Convert Hadoop path into IGFS path.
-     *
-     * @param path Hadoop path.
-     * @return IGFS path.
-     */
-    @Nullable private IgfsPath convert(Path path) {
-        if (path == null)
-            return null;
-
-        return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
-            new IgfsPath(workingDir, path.toUri().getPath());
-    }
-
-    /**
-     * Convert IGFS affinity block location into Hadoop affinity block location.
-     *
-     * @param block IGFS affinity block location.
-     * @return Hadoop affinity block location.
-     */
-    private BlockLocation convert(IgfsBlockLocation block) {
-        Collection<String> names = block.names();
-        Collection<String> hosts = block.hosts();
-
-        return new BlockLocation(
-            names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */,
-            hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */,
-            block.start(), block.length()
-        ) {
-            @Override public String toString() {
-                try {
-                    return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() +
-                        ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']';
-                }
-                catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-    }
-
-    /**
-     * Convert IGFS file information into Hadoop file status.
-     *
-     * @param file IGFS file information.
-     * @return Hadoop file status.
-     */
-    private FileStatus convert(IgfsFile file) {
-        return new FileStatus(
-            file.length(),
-            file.isDirectory(),
-            dfltReplication,
-            file.groupBlockSize(),
-            file.modificationTime(),
-            file.accessTime(),
-            permission(file),
-            file.property(IgfsUtils.PROP_USER_NAME, user),
-            file.property(IgfsUtils.PROP_GROUP_NAME, "users"),
-            convert(file.path())) {
-            @Override public String toString() {
-                return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]";
-            }
-        };
-    }
-
-    /**
-     * Convert Hadoop permission into IGFS file attribute.
-     *
-     * @param perm Hadoop permission.
-     * @return IGFS attributes.
-     */
-    private Map<String, String> permission(FsPermission perm) {
-        if (perm == null)
-            perm = FsPermission.getDefault();
-
-        return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm));
-    }
-
-    /**
-     * @param perm Permission.
-     * @return String.
-     */
-    private static String toString(FsPermission perm) {
-        return String.format("%04o", perm.toShort());
-    }
-
-    /**
-     * Convert IGFS file attributes into Hadoop permission.
-     *
-     * @param file File info.
-     * @return Hadoop permission.
-     */
-    private FsPermission permission(IgfsFile file) {
-        String perm = file.property(IgfsUtils.PROP_PERMISSION, null);
-
-        if (perm == null)
-            return FsPermission.getDefault();
-
-        try {
-            return new FsPermission((short)Integer.parseInt(perm, 8));
-        }
-        catch (NumberFormatException ignore) {
-            return FsPermission.getDefault();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteHadoopFileSystem.class, this);
-    }
-
-    /**
-     * Returns the user name this File System is created on behalf of.
-     * @return the user name
-     */
-    public String user() {
-        return user;
-    }
-
-    /**
-     * Gets cached or creates a {@link FileSystem}.
-     *
-     * @return The secondary file system.
-     */
-    private FileSystem secondaryFileSystem() throws IOException{
-        assert factory != null;
-
-        return factory.get(user);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
deleted file mode 100644
index d8e70d1..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <!-- Package description. -->
- * Contains Ignite Hadoop 2.x <code>FileSystem</code> implementation.
- */
-package org.apache.ignite.hadoop.fs.v2;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
deleted file mode 100644
index 583af35..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.client.GridClient;
-import org.apache.ignite.internal.client.GridClientConfiguration;
-import org.apache.ignite.internal.client.GridClientException;
-import org.apache.ignite.internal.client.GridClientFactory;
-import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
-import org.apache.ignite.internal.processors.hadoop.proto.HadoopClientProtocol;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-
-import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
-
-
-/**
- * Ignite Hadoop client protocol provider.
- */
-public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
-    /** Framework name used in configuration. */
-    public static final String FRAMEWORK_NAME = "ignite";
-
-    /** Clients. */
-    private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>();
-
-    /** {@inheritDoc} */
-    @Override public ClientProtocol create(Configuration conf) throws IOException {
-        if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
-            String addr = conf.get(MRConfig.MASTER_ADDRESS);
-
-            if (F.isEmpty(addr))
-                throw new IOException("Failed to create client protocol because server address is not specified (is " +
-                    MRConfig.MASTER_ADDRESS + " property set?).");
-
-            if (F.eq(addr, "local"))
-                throw new IOException("Local execution mode is not supported, please point " +
-                    MRConfig.MASTER_ADDRESS + " to real Ignite node.");
-
-            return createProtocol(addr, conf);
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
-        if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME)))
-            return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf);
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close(ClientProtocol cliProto) throws IOException {
-        // No-op.
-    }
-
-    /**
-     * Internal protocol creation routine.
-     *
-     * @param addr Address.
-     * @param conf Configuration.
-     * @return Client protocol.
-     * @throws IOException If failed.
-     */
-    private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException {
-        return new HadoopClientProtocol(conf, client(addr));
-    }
-
-    /**
-     * Create client.
-     *
-     * @param addr Endpoint address.
-     * @return Client.
-     * @throws IOException If failed.
-     */
-    private static GridClient client(String addr) throws IOException {
-        try {
-            IgniteInternalFuture<GridClient> fut = cliMap.get(addr);
-
-            if (fut == null) {
-                GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>();
-
-                IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0);
-
-                if (oldFut != null)
-                    return oldFut.get();
-                else {
-                    GridClientConfiguration cliCfg = new GridClientConfiguration();
-
-                    cliCfg.setProtocol(TCP);
-                    cliCfg.setServers(Collections.singletonList(addr));
-                    cliCfg.setMarshaller(new GridClientJdkMarshaller());
-                    cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day.
-                    cliCfg.setDaemon(true);
-
-                    try {
-                        GridClient cli = GridClientFactory.start(cliCfg);
-
-                        fut0.onDone(cli);
-
-                        return cli;
-                    }
-                    catch (GridClientException e) {
-                        fut0.onDone(e);
-
-                        throw new IOException("Failed to establish connection with Ignite node: " + addr, e);
-                    }
-                }
-            }
-            else
-                return fut.get();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to establish connection with Ignite node: " + addr, e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
index d4a44fa..e1101c5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
@@ -17,16 +17,6 @@
 
 package org.apache.ignite.hadoop.mapreduce;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -38,13 +28,23 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
 import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
 import org.apache.ignite.internal.processors.igfs.IgfsEx;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.UUID;
+
 import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
 
 /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 27ffc19..2d1ac0b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -24,11 +24,11 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
 import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
 import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
@@ -116,7 +116,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
     /** {@inheritDoc} */
     @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes,
         @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
-        List<HadoopInputSplit> splits = HadoopUtils.sortInputSplits(job.input());
+        List<HadoopInputSplit> splits = HadoopCommonUtils.sortInputSplits(job.input());
         int reducerCnt = job.info().reducers();
 
         if (reducerCnt < 0)

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
deleted file mode 100644
index 7635b9e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <!-- Package description. -->
- * Ignite Hadoop Accelerator map-reduce classes.
- */
-package org.apache.ignite.hadoop.mapreduce;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
index 26dc4b2..12669aa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
@@ -17,14 +17,12 @@
 
 package org.apache.ignite.hadoop.util;
 
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.Serializable;
 
 /**
- * Hadoop file system name mapper. Used by {@link HadoopFileSystemFactory} implementation to pass proper user names
- * to the underlying Hadoop file system.
+ * Hadoop file system name mapper. Ensures that correct user name is passed to the underlying Hadoop file system.
  */
 public interface UserNameMapper extends Serializable {
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
deleted file mode 100644
index 23eaa18..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Arrays;
-
-/**
- * Hadoop attributes.
- */
-public class HadoopAttributes implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Attribute name. */
-    public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + ".hadoop";
-
-    /** Map-reduce planner class name. */
-    private String plannerCls;
-
-    /** External executor flag. */
-    private boolean extExec;
-
-    /** Maximum parallel tasks. */
-    private int maxParallelTasks;
-
-    /** Maximum task queue size. */
-    private int maxTaskQueueSize;
-
-    /** Library names. */
-    @GridToStringExclude
-    private String[] libNames;
-
-    /** Number of cores. */
-    private int cores;
-
-    /**
-     * Get attributes for node (if any).
-     *
-     * @param node Node.
-     * @return Attributes or {@code null} if Hadoop Accelerator is not enabled for node.
-     */
-    @Nullable public static HadoopAttributes forNode(ClusterNode node) {
-        return node.attribute(NAME);
-    }
-
-    /**
-     * {@link Externalizable} support.
-     */
-    public HadoopAttributes() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param cfg Configuration.
-     */
-    public HadoopAttributes(HadoopConfiguration cfg) {
-        assert cfg != null;
-        assert cfg.getMapReducePlanner() != null;
-
-        plannerCls = cfg.getMapReducePlanner().getClass().getName();
-
-        // TODO: IGNITE-404: Get from configuration when fixed.
-        extExec = false;
-
-        maxParallelTasks = cfg.getMaxParallelTasks();
-        maxTaskQueueSize = cfg.getMaxTaskQueueSize();
-        libNames = cfg.getNativeLibraryNames();
-
-        // Cores count already passed in other attributes, we add it here for convenience.
-        cores = Runtime.getRuntime().availableProcessors();
-    }
-
-    /**
-     * @return Map reduce planner class name.
-     */
-    public String plannerClassName() {
-        return plannerCls;
-    }
-
-    /**
-     * @return External execution flag.
-     */
-    public boolean externalExecution() {
-        return extExec;
-    }
-
-    /**
-     * @return Maximum parallel tasks.
-     */
-    public int maxParallelTasks() {
-        return maxParallelTasks;
-    }
-
-    /**
-     * @return Maximum task queue size.
-     */
-    public int maxTaskQueueSize() {
-        return maxTaskQueueSize;
-    }
-
-
-    /**
-     * @return Native library names.
-     */
-    public String[] nativeLibraryNames() {
-        return libNames;
-    }
-
-    /**
-     * @return Number of cores on machine.
-     */
-    public int cores() {
-        return cores;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(plannerCls);
-        out.writeBoolean(extExec);
-        out.writeInt(maxParallelTasks);
-        out.writeInt(maxTaskQueueSize);
-        out.writeObject(libNames);
-        out.writeInt(cores);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        plannerCls = (String)in.readObject();
-        extExec = in.readBoolean();
-        maxParallelTasks = in.readInt();
-        maxTaskQueueSize = in.readInt();
-        libNames = (String[])in.readObject();
-        cores = in.readInt();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopAttributes.class, this, "libNames", Arrays.toString(libNames));
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
new file mode 100644
index 0000000..83f94ce
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeSet;
+
+/**
+ * Common Hadoop utility methods which do not depend on Hadoop API.
+ */
+public class HadoopCommonUtils {
+    /**
+     * Sort input splits by length.
+     *
+     * @param splits Splits.
+     * @return Sorted splits.
+     */
+    public static List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit> splits) {
+        int id = 0;
+
+        TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>();
+
+        for (HadoopInputSplit split : splits) {
+            long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0;
+
+            sortedSplits.add(new SplitSortWrapper(id++, split, len));
+        }
+
+        ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size());
+
+        for (SplitSortWrapper sortedSplit : sortedSplits)
+            res.add(sortedSplit.split);
+
+        return res;
+    }
+
+    /**
+     * Split wrapper for sorting.
+     */
+    private static class SplitSortWrapper implements Comparable<SplitSortWrapper> {
+        /** Unique ID. */
+        private final int id;
+
+        /** Split. */
+        private final HadoopInputSplit split;
+
+        /** Split length. */
+        private final long len;
+
+        /**
+         * Constructor.
+         *
+         * @param id Unique ID.
+         * @param split Split.
+         * @param len Split length.
+         */
+        public SplitSortWrapper(int id, HadoopInputSplit split, long len) {
+            this.id = id;
+            this.split = split;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("NullableProblems")
+        @Override public int compareTo(SplitSortWrapper other) {
+            long res = len - other.len;
+
+            if (res > 0)
+                return -1;
+            else if (res < 0)
+                return 1;
+            else
+                return id - other.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id;
+        }
+    }
+
+    /**
+     * Private constructor.
+     */
+    private HadoopCommonUtils() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
deleted file mode 100644
index aeda5c0..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-
-/**
- * Abstract class for all hadoop components.
- */
-public abstract class HadoopComponent {
-    /** Hadoop context. */
-    protected HadoopContext ctx;
-
-    /** Logger. */
-    protected IgniteLogger log;
-
-    /**
-     * @param ctx Hadoop context.
-     */
-    public void start(HadoopContext ctx) throws IgniteCheckedException {
-        this.ctx = ctx;
-
-        log = ctx.kernalContext().log(getClass());
-    }
-
-    /**
-     * Stops manager.
-     */
-    public void stop(boolean cancel) {
-        // No-op.
-    }
-
-    /**
-     * Callback invoked when all grid components are started.
-     */
-    public void onKernalStart() throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /**
-     * Callback invoked before all grid components are stopped.
-     */
-    public void onKernalStop(boolean cancel) {
-        // No-op.
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
deleted file mode 100644
index 42a3d72..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
-import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskExecutorAdapter;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-
-/**
- * Hadoop accelerator context.
- */
-public class HadoopContext {
-    /** Kernal context. */
-    private GridKernalContext ctx;
-
-    /** Hadoop configuration. */
-    private HadoopConfiguration cfg;
-
-    /** Job tracker. */
-    private HadoopJobTracker jobTracker;
-
-    /** External task executor. */
-    private HadoopTaskExecutorAdapter taskExecutor;
-
-    /** */
-    private HadoopShuffle shuffle;
-
-    /** Managers list. */
-    private List<HadoopComponent> components = new ArrayList<>();
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public HadoopContext(
-        GridKernalContext ctx,
-        HadoopConfiguration cfg,
-        HadoopJobTracker jobTracker,
-        HadoopTaskExecutorAdapter taskExecutor,
-        HadoopShuffle shuffle
-    ) {
-        this.ctx = ctx;
-        this.cfg = cfg;
-
-        this.jobTracker = add(jobTracker);
-        this.taskExecutor = add(taskExecutor);
-        this.shuffle = add(shuffle);
-    }
-
-    /**
-     * Gets list of managers.
-     *
-     * @return List of managers.
-     */
-    public List<HadoopComponent> components() {
-        return components;
-    }
-
-    /**
-     * Gets kernal context.
-     *
-     * @return Grid kernal context instance.
-     */
-    public GridKernalContext kernalContext() {
-        return ctx;
-    }
-
-    /**
-     * Gets Hadoop configuration.
-     *
-     * @return Hadoop configuration.
-     */
-    public HadoopConfiguration configuration() {
-        return cfg;
-    }
-
-    /**
-     * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}.
-     *
-     * @return Local node ID.
-     */
-    public UUID localNodeId() {
-        return ctx.localNodeId();
-    }
-
-    /**
-     * Gets local node order.
-     *
-     * @return Local node order.
-     */
-    public long localNodeOrder() {
-        assert ctx.discovery() != null;
-
-        return ctx.discovery().localNode().order();
-    }
-
-    /**
-     * @return Hadoop-enabled nodes.
-     */
-    public Collection<ClusterNode> nodes() {
-        return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersionEx());
-    }
-
-    /**
-     * @return {@code True} if
-     */
-    public boolean jobUpdateLeader() {
-        long minOrder = Long.MAX_VALUE;
-        ClusterNode minOrderNode = null;
-
-        for (ClusterNode node : nodes()) {
-            if (node.order() < minOrder) {
-                minOrder = node.order();
-                minOrderNode = node;
-            }
-        }
-
-        assert minOrderNode != null;
-
-        return localNodeId().equals(minOrderNode.id());
-    }
-
-    /**
-     * @param meta Job metadata.
-     * @return {@code true} If local node is participating in job execution.
-     */
-    public boolean isParticipating(HadoopJobMetadata meta) {
-        UUID locNodeId = localNodeId();
-
-        if (locNodeId.equals(meta.submitNodeId()))
-            return true;
-
-        HadoopMapReducePlan plan = meta.mapReducePlan();
-
-        return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader();
-    }
-
-    /**
-     * @return Jon tracker instance.
-     */
-    public HadoopJobTracker jobTracker() {
-        return jobTracker;
-    }
-
-    /**
-     * @return Task executor.
-     */
-    public HadoopTaskExecutorAdapter taskExecutor() {
-        return taskExecutor;
-    }
-
-    /**
-     * @return Shuffle.
-     */
-    public HadoopShuffle shuffle() {
-        return shuffle;
-    }
-
-    /**
-     * @return Map-reduce planner.
-     */
-    public HadoopMapReducePlanner planner() {
-        return cfg.getMapReducePlanner();
-    }
-
-    /**
-     * Adds component.
-     *
-     * @param c Component to add.
-     * @return Added manager.
-     */
-    private <C extends HadoopComponent> C add(C c) {
-        components.add(c);
-
-        return c;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
deleted file mode 100644
index ae17ac8..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop job info based on default Hadoop configuration.
- */
-public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
-    /** */
-    private static final long serialVersionUID = 5489900236464999951L;
-
-    /** {@code true} If job has combiner. */
-    private boolean hasCombiner;
-
-    /** Number of reducers configured for job. */
-    private int numReduces;
-
-    /** Configuration. */
-    private Map<String,String> props = new HashMap<>();
-
-    /** Job name. */
-    private String jobName;
-
-    /** User name. */
-    private String user;
-
-    /**
-     * Default constructor required by {@link Externalizable}.
-     */
-    public HadoopDefaultJobInfo() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param jobName Job name.
-     * @param user User name.
-     * @param hasCombiner {@code true} If job has combiner.
-     * @param numReduces Number of reducers configured for job.
-     * @param props All other properties of the job.
-     */
-    public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces,
-        Map<String, String> props) {
-        this.jobName = jobName;
-        this.user = user;
-        this.hasCombiner = hasCombiner;
-        this.numReduces = numReduces;
-        this.props = props;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public String property(String name) {
-        return props.get(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
-        @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
-        assert jobCls != null;
-
-        try {
-            Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class,
-                HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class, HadoopHelper.class);
-
-            return constructor.newInstance(jobId, this, log, libNames, helper);
-        }
-        catch (Throwable t) {
-            if (t instanceof Error)
-                throw (Error)t;
-            
-            throw new IgniteCheckedException(t);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasCombiner() {
-        return hasCombiner;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasReducer() {
-        return reducers() > 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int reducers() {
-        return numReduces;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String jobName() {
-        return jobName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String user() {
-        return user;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeString(out, jobName);
-        U.writeString(out, user);
-
-        out.writeBoolean(hasCombiner);
-        out.writeInt(numReduces);
-
-        U.writeStringMap(out, props);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        jobName = U.readString(in);
-        user = U.readString(in);
-
-        hasCombiner = in.readBoolean();
-        numReduces = in.readInt();
-
-        props = U.readStringMap(in);
-    }
-
-    /**
-     * @return Properties of the job.
-     */
-    public Map<String, String> properties() {
-        return props;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
deleted file mode 100644
index ed2657e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop facade implementation.
- */
-public class HadoopImpl implements Hadoop {
-    /** Hadoop processor. */
-    private final HadoopProcessor proc;
-
-    /** Busy lock. */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
-    /**
-     * Constructor.
-     *
-     * @param proc Hadoop processor.
-     */
-    HadoopImpl(HadoopProcessor proc) {
-        this.proc = proc;
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopConfiguration configuration() {
-        return proc.config();
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopJobId nextJobId() {
-        if (busyLock.enterBusy()) {
-            try {
-                return proc.nextJobId();
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to get next job ID (grid is stopping).");
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
-        if (busyLock.enterBusy()) {
-            try {
-                return proc.submit(jobId, jobInfo);
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to submit job (grid is stopping).");
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
-        if (busyLock.enterBusy()) {
-            try {
-                return proc.status(jobId);
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to get job status (grid is stopping).");
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
-        if (busyLock.enterBusy()) {
-            try {
-                return proc.counters(jobId);
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to get job counters (grid is stopping).");
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
-        if (busyLock.enterBusy()) {
-            try {
-                return proc.finishFuture(jobId);
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to get job finish future (grid is stopping).");
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
-        if (busyLock.enterBusy()) {
-            try {
-                return proc.kill(jobId);
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to kill job (grid is stopping).");
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
deleted file mode 100644
index 4e03e17..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
-
-/**
- * Hadoop +counter group adapter.
- */
-class HadoopMapReduceCounterGroup implements CounterGroup {
-    /** Counters. */
-    private final HadoopMapReduceCounters cntrs;
-
-    /** Group name. */
-    private final String name;
-
-    /**
-     * Creates new instance.
-     *
-     * @param cntrs Client counters instance.
-     * @param name Group name.
-     */
-    HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
-        this.cntrs = cntrs;
-        this.name = name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getName() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getDisplayName() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setDisplayName(String displayName) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addCounter(Counter counter) {
-        addCounter(counter.getName(), counter.getDisplayName(), 0);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter addCounter(String name, String displayName, long value) {
-        final Counter counter = cntrs.findCounter(this.name, name);
-
-        counter.setValue(value);
-
-        return counter;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter findCounter(String counterName, String displayName) {
-        return cntrs.findCounter(name, counterName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter findCounter(String counterName, boolean create) {
-        return cntrs.findCounter(name, counterName, create);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter findCounter(String counterName) {
-        return cntrs.findCounter(name, counterName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return cntrs.groupSize(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
-        for (final Counter counter : rightGroup)
-            cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
-    }
-
-    /** {@inheritDoc} */
-    @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
-        return this;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<Counter> iterator() {
-        return cntrs.iterateGroup(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(DataOutput out) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readFields(DataInput in) throws IOException {
-        throw new UnsupportedOperationException("not implemented");
-    }
-}
\ No newline at end of file


Mime
View raw message