ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [31/61] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed changes.
Date Thu, 05 Mar 2015 10:49:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
new file mode 100644
index 0000000..351839a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Counter for the job statistics accumulation.
+ */
+public class HadoopPerformanceCounter extends HadoopCounterAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The group name for this counter. */
+    private static final String GROUP_NAME = "SYSTEM";
+
+    /** The counter name for this counter. */
+    private static final String COUNTER_NAME = "PERFORMANCE";
+
+    /** Events collections. */
+    private Collection<T2<String,Long>> evts = new ArrayList<>();
+
+    /** Node id to insert into the event info. */
+    private UUID nodeId;
+
+    /** */
+    private int reducerNum;
+
+    /** */
+    private volatile Long firstShuffleMsg;
+
+    /** */
+    private volatile Long lastShuffleMsg;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopPerformanceCounter() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param grp Group name.
+     * @param name Counter name.
+     */
+    public HadoopPerformanceCounter(String grp, String name) {
+        super(grp, name);
+    }
+
+    /**
+     * Constructor to create instance to use this as helper.
+     *
+     * @param nodeId Id of the work node.
+     */
+    public HadoopPerformanceCounter(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeValue(ObjectOutput out) throws IOException {
+        U.writeCollection(out, evts);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readValue(ObjectInput in) throws IOException {
+        try {
+            evts = U.readCollection(in);
+        }
+        catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void merge(HadoopCounter cntr) {
+        evts.addAll(((HadoopPerformanceCounter)cntr).evts);
+    }
+
+    /**
+     * Gets the events collection.
+     *
+     * @return Collection of event.
+     */
+    public Collection<T2<String, Long>> evts() {
+        return evts;
+    }
+
+    /**
+     * Generate name that consists of some event information.
+     *
+     * @param info Task info.
+     * @param evtType The type of the event.
+     * @return String contains necessary event information.
+     */
+    private String eventName(HadoopTaskInfo info, String evtType) {
+        return eventName(info.type().toString(), info.taskNumber(), evtType);
+    }
+
+    /**
+     * Generate name that consists of some event information.
+     *
+     * @param taskType Task type.
+     * @param taskNum Number of the task.
+     * @param evtType The type of the event.
+     * @return String contains necessary event information.
+     */
+    private String eventName(String taskType, int taskNum, String evtType) {
+        assert nodeId != null;
+
+        return taskType + " " + taskNum + " " + evtType + " " + nodeId;
+    }
+
+    /**
+     * Adds event of the task submission (task instance creation).
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskSubmit(HadoopTaskInfo info, long ts) {
+        evts.add(new T2<>(eventName(info, "submit"), ts));
+    }
+
+    /**
+     * Adds event of the task preparation.
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskPrepare(HadoopTaskInfo info, long ts) {
+        evts.add(new T2<>(eventName(info, "prepare"), ts));
+    }
+
+    /**
+     * Adds event of the task finish.
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskFinish(HadoopTaskInfo info, long ts) {
+        if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) {
+            evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
+            evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
+
+            lastShuffleMsg = null;
+        }
+
+        evts.add(new T2<>(eventName(info, "finish"), ts));
+    }
+
+    /**
+     * Adds event of the task run.
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskStart(HadoopTaskInfo info, long ts) {
+        evts.add(new T2<>(eventName(info, "start"), ts));
+    }
+
+    /**
+     * Adds event of the job preparation.
+     *
+     * @param ts Timestamp of the event.
+     */
+    public void onJobPrepare(long ts) {
+        assert nodeId != null;
+
+        evts.add(new T2<>("JOB prepare " + nodeId, ts));
+    }
+
+    /**
+     * Adds event of the job start.
+     *
+     * @param ts Timestamp of the event.
+     */
+    public void onJobStart(long ts) {
+        assert nodeId != null;
+
+        evts.add(new T2<>("JOB start " + nodeId, ts));
+    }
+
+    /**
+     * Adds client submission events from job info.
+     *
+     * @param info Job info.
+     */
+    public void clientSubmissionEvents(HadoopJobInfo info) {
+        assert nodeId != null;
+
+        addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
+        addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY);
+        addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY);
+    }
+
+    /**
+     * Adds event with timestamp from some property in job info.
+     *
+     * @param evt Event type and phase.
+     * @param info Job info.
+     * @param propName Property name to get timestamp.
+     */
+    private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) {
+        String val = info.property(propName);
+
+        if (!F.isEmpty(val)) {
+            try {
+                evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val)));
+            }
+            catch (NumberFormatException e) {
+                throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e);
+            }
+        }
+    }
+
+    /**
+     * Registers shuffle message event.
+     *
+     * @param reducerNum Number of reducer that receives the data.
+     * @param ts Timestamp of the event.
+     */
+    public void onShuffleMessage(int reducerNum, long ts) {
+        this.reducerNum = reducerNum;
+
+        if (firstShuffleMsg == null)
+            firstShuffleMsg = ts;
+
+        lastShuffleMsg = ts;
+    }
+
+    /**
+     * Gets system predefined performance counter from the HadoopCounters object.
+     *
+     * @param cntrs HadoopCounters object.
+     * @param nodeId Node id for methods that adds events. It may be null if you don't use ones.
+     * @return Predefined performance counter.
+     */
+    public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) {
+        HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
+
+        if (nodeId != null)
+            cntr.nodeId(nodeId);
+
+        return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
+    }
+
+    /**
+     * Sets the nodeId field.
+     *
+     * @param nodeId Node id.
+     */
+    private void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
deleted file mode 100644
index e9461e2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.IgfsConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class GridHadoopDistributedFileSystem extends DistributedFileSystem {
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>() {
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws IOException {
-        super.initialize(uri, conf);
-
-        setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
-    }
-
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    public void setUser(String userName) {
-        this.userName.set(userName);
-
-        setWorkingDirectory(getHomeDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
-
-        return path.makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        Path fixedDir = fixRelativePart(dir);
-
-        String res = fixedDir.toUri().getPath();
-
-        if (!DFSUtil.isValidName(res))
-            throw new IllegalArgumentException("Invalid DFS directory name " + res);
-
-        workingDir.set(fixedDir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workingDir.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
deleted file mode 100644
index 52e7d29..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.igfs.hadoop.v1.*;
-
-/**
- * Utilities for configuring file systems to support the separate working directory per each thread.
- */
-public class GridHadoopFileSystemsUtils {
-    /** Name of the property for setting working directory on create new local FS instance. */
-    public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
-
-    /**
-     * Set user name and default working directory for current thread if it's supported by file system.
-     *
-     * @param fs File system.
-     * @param userName User name.
-     */
-    public static void setUser(FileSystem fs, String userName) {
-        if (fs instanceof IgfsHadoopFileSystem)
-            ((IgfsHadoopFileSystem)fs).setUser(userName);
-        else if (fs instanceof GridHadoopDistributedFileSystem)
-            ((GridHadoopDistributedFileSystem)fs).setUser(userName);
-    }
-
-    /**
-     * Setup wrappers of filesystems to support the separate working directory.
-     *
-     * @param cfg Config for setup.
-     */
-    public static void setupFileSystems(Configuration cfg) {
-        cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", GridHadoopLocalFileSystemV1.class.getName());
-        cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
-                GridHadoopLocalFileSystemV2.class.getName());
-
-        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", GridHadoopDistributedFileSystem.class.getName());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
deleted file mode 100644
index 28834d4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.fs.*;
-
-import java.io.*;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class GridHadoopLocalFileSystemV1 extends LocalFileSystem {
-    /**
-     * Creates new local file system.
-     */
-    public GridHadoopLocalFileSystemV1() {
-        super(new GridHadoopRawLocalFileSystem());
-    }
-
-    /** {@inheritDoc} */
-    @Override public File pathToFile(Path path) {
-        return ((GridHadoopRawLocalFileSystem)getRaw()).convert(path);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
deleted file mode 100644
index 62d7cea..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.local.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.hadoop.fs.FsConstants.*;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class GridHadoopLocalFileSystemV2 extends ChecksumFs {
-    /**
-     * Creates new local file system.
-     *
-     * @param cfg Configuration.
-     * @throws IOException If failed.
-     * @throws URISyntaxException If failed.
-     */
-    public GridHadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
-        super(new DelegateFS(cfg));
-    }
-
-    /**
-     * Creates new local file system.
-     *
-     * @param uri URI.
-     * @param cfg Configuration.
-     * @throws IOException If failed.
-     * @throws URISyntaxException If failed.
-     */
-    public GridHadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
-        this(cfg);
-    }
-
-    /**
-     * Delegate file system.
-     */
-    private static class DelegateFS extends DelegateToFileSystem {
-        /**
-         * Creates new local file system.
-         *
-         * @param cfg Configuration.
-         * @throws IOException If failed.
-         * @throws URISyntaxException If failed.
-         */
-        public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
-            super(LOCAL_FS_URI, new GridHadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int getUriDefaultPort() {
-            return -1;
-        }
-
-        /** {@inheritDoc} */
-        @Override public FsServerDefaults getServerDefaults() throws IOException {
-            return LocalConfigKeys.getServerDefaults();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isValidName(String src) {
-            return true;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
deleted file mode 100644
index 29645f8..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.file.*;
-
-/**
- * Local file system implementation for Hadoop.
- */
-public class GridHadoopRawLocalFileSystem extends FileSystem {
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
-        @Override protected Path initialValue() {
-            return getInitialWorkingDirectory();
-        }
-    };
-
-    /**
-     * Converts Hadoop path to local path.
-     *
-     * @param path Hadoop path.
-     * @return Local path.
-     */
-    File convert(Path path) {
-        checkPath(path);
-
-        if (path.isAbsolute())
-            return new File(path.toUri().getPath());
-
-        return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        return makeQualified(new Path(System.getProperty("user.home")));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getInitialWorkingDirectory() {
-        File f = new File(System.getProperty("user.dir"));
-
-        return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws IOException {
-        super.initialize(uri, conf);
-
-        setConf(conf);
-
-        String initWorkDir = conf.get(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
-
-        if (initWorkDir != null)
-            setWorkingDirectory(new Path(initWorkDir));
-    }
-
-    /** {@inheritDoc} */
-    @Override public URI getUri() {
-        return FsConstants.LOCAL_FS_URI;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-        return new FSDataInputStream(new InStream(checkExists(convert(f))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
-        short replication, long blockSize, Progressable progress) throws IOException {
-        File file = convert(f);
-
-        if (!overwrite && !file.createNewFile())
-            throw new IOException("Failed to create new file: " + f.toUri());
-
-        return out(file, false, bufSize);
-    }
-
-    /**
-     * @param file File.
-     * @param append Append flag.
-     * @return Output stream.
-     * @throws IOException If failed.
-     */
-    private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
-        return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
-            bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
-        return out(convert(f), true, bufSize);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean rename(Path src, Path dst) throws IOException {
-        return convert(src).renameTo(convert(dst));
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean delete(Path f, boolean recursive) throws IOException {
-        File file = convert(f);
-
-        if (file.isDirectory() && !recursive)
-            throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
-
-        return U.delete(file);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        workDir.set(fixRelativePart(dir));
-
-        checkPath(dir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workDir.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-        if(f == null)
-            throw new IllegalArgumentException("mkdirs path arg is null");
-
-        Path parent = f.getParent();
-
-        File p2f = convert(f);
-
-        if(parent != null) {
-            File parent2f = convert(parent);
-
-            if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
-                throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
-
-        }
-
-        return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileStatus(Path f) throws IOException {
-        return fileStatus(checkExists(convert(f)));
-    }
-
-    /**
-     * @return File status.
-     */
-    private FileStatus fileStatus(File file) throws IOException {
-        boolean dir = file.isDirectory();
-
-        java.nio.file.Path path = dir ? null : file.toPath();
-
-        return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
-            /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
-            new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
-    }
-
-    /**
-     * @param file File.
-     * @return Same file.
-     * @throws FileNotFoundException If does not exist.
-     */
-    private static File checkExists(File file) throws FileNotFoundException {
-        if (!file.exists())
-            throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
-
-        return file;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus[] listStatus(Path f) throws IOException {
-        File file = convert(f);
-
-        if (checkExists(file).isFile())
-            return new FileStatus[] {fileStatus(file)};
-
-        File[] files = file.listFiles();
-
-        FileStatus[] res = new FileStatus[files.length];
-
-        for (int i = 0; i < res.length; i++)
-            res[i] = fileStatus(files[i]);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean supportsSymlinks() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
-        Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
-        return getFileStatus(getLinkTarget(f));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getLinkTarget(Path f) throws IOException {
-        File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
-
-        return new Path(file.toURI());
-    }
-
-    /**
-     * Input stream.
-     */
-    private static class InStream extends InputStream implements Seekable, PositionedReadable {
-        /** */
-        private final RandomAccessFile file;
-
-        /**
-         * @param f File.
-         * @throws IOException If failed.
-         */
-        public InStream(File f) throws IOException {
-            file = new RandomAccessFile(f, "r");
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read() throws IOException {
-            return file.read();
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
-            return file.read(b, off, len);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized void close() throws IOException {
-            file.close();
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
-            long pos0 = file.getFilePointer();
-
-            file.seek(pos);
-            int res = file.read(buf, off, len);
-
-            file.seek(pos0);
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
-            if (read(pos, buf, off, len) != len)
-                throw new IOException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readFully(long pos, byte[] buf) throws IOException {
-            readFully(pos, buf, 0, buf.length);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized void seek(long pos) throws IOException {
-            file.seek(pos);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized long getPos() throws IOException {
-            return file.getFilePointer();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean seekToNewSource(long targetPos) throws IOException {
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
new file mode 100644
index 0000000..509f443
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+
+/**
+ * Wrapper of HDFS for support of separated working directory.
+ */
+public class HadoopDistributedFileSystem extends DistributedFileSystem {
+    /** User name for each thread. */
+    private final ThreadLocal<String> userName = new ThreadLocal<String>() {
+        /** {@inheritDoc} */
+        @Override protected String initialValue() {
+            return DFLT_USER_NAME;
+        }
+    };
+
+    /** Working directory for each thread. */
+    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
+        /** {@inheritDoc} */
+        @Override protected Path initialValue() {
+            return getHomeDirectory();
+        }
+    };
+
+    /** {@inheritDoc} */
+    @Override public void initialize(URI uri, Configuration conf) throws IOException {
+        super.initialize(uri, conf);
+
+        setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+    }
+
+    /**
+     * Set user name and default working directory for current thread.
+     *
+     * @param userName User name.
+     */
+    public void setUser(String userName) {
+        this.userName.set(userName);
+
+        setWorkingDirectory(getHomeDirectory());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getHomeDirectory() {
+        Path path = new Path("/user/" + userName.get());
+
+        return path.makeQualified(getUri(), null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setWorkingDirectory(Path dir) {
+        Path fixedDir = fixRelativePart(dir);
+
+        String res = fixedDir.toUri().getPath();
+
+        if (!DFSUtil.isValidName(res))
+            throw new IllegalArgumentException("Invalid DFS directory name " + res);
+
+        workingDir.set(fixedDir);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getWorkingDirectory() {
+        return workingDir.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
new file mode 100644
index 0000000..f3f51d4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+
+/**
+ * Utilities for configuring file systems to support the separate working directory per each thread.
+ */
+public class HadoopFileSystemsUtils {
+    /** Name of the property for setting working directory on create new local FS instance. */
+    public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
+
+    /**
+     * Set user name and default working directory for current thread if it's supported by file system.
+     *
+     * @param fs File system.
+     * @param userName User name.
+     */
+    public static void setUser(FileSystem fs, String userName) {
+        if (fs instanceof IgniteHadoopFileSystem)
+            ((IgniteHadoopFileSystem)fs).setUser(userName);
+        else if (fs instanceof HadoopDistributedFileSystem)
+            ((HadoopDistributedFileSystem)fs).setUser(userName);
+    }
+
+    /**
+     * Setup wrappers of filesystems to support the separate working directory.
+     *
+     * @param cfg Config for setup.
+     */
+    public static void setupFileSystems(Configuration cfg) {
+        cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
+        cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
+                HadoopLocalFileSystemV2.class.getName());
+
+        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
new file mode 100644
index 0000000..9cc5881
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.fs.*;
+
+import java.io.*;
+
+/**
+ * Local file system replacement for Hadoop jobs.
+ */
+public class HadoopLocalFileSystemV1 extends LocalFileSystem {
+    /**
+     * Creates new local file system.
+     */
+    public HadoopLocalFileSystemV1() {
+        super(new HadoopRawLocalFileSystem());
+    }
+
+    /** {@inheritDoc} */
+    @Override public File pathToFile(Path path) {
+        return ((HadoopRawLocalFileSystem)getRaw()).convert(path);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
new file mode 100644
index 0000000..15ddc5a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.local.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.hadoop.fs.FsConstants.*;
+
+/**
+ * Local file system replacement for Hadoop jobs.
+ */
+public class HadoopLocalFileSystemV2 extends ChecksumFs {
+    /**
+     * Creates new local file system.
+     *
+     * @param cfg Configuration.
+     * @throws IOException If failed.
+     * @throws URISyntaxException If failed.
+     */
+    public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
+        super(new DelegateFS(cfg));
+    }
+
+    /**
+     * Creates new local file system.
+     *
+     * @param uri URI.
+     * @param cfg Configuration.
+     * @throws IOException If failed.
+     * @throws URISyntaxException If failed.
+     */
+    public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
+        this(cfg);
+    }
+
+    /**
+     * Delegate file system.
+     */
+    private static class DelegateFS extends DelegateToFileSystem {
+        /**
+         * Creates new local file system.
+         *
+         * @param cfg Configuration.
+         * @throws IOException If failed.
+         * @throws URISyntaxException If failed.
+         */
+        public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
+            super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getUriDefaultPort() {
+            return -1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FsServerDefaults getServerDefaults() throws IOException {
+            return LocalConfigKeys.getServerDefaults();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isValidName(String src) {
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
new file mode 100644
index 0000000..7edcec0
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+/**
+ * This class lists parameters that can be specified in Hadoop configuration.
+ * Hadoop configuration can be specified in {@code core-site.xml} file
+ * or passed to map-reduce task directly when using Hadoop driver for IGFS file system:
+ * <ul>
+ *     <li>
+ *         {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides
+ *         the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()}
+ *         IGFS data node configuration property.
+ *     </li>
+ *     <li>
+ *         {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If
+ *         {@code true}, then all file system operations will be logged to a file.
+ *     </li>
+ *     <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li>
+ *     <li>
+ *         {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before
+ *         it gets flushed to log file. Higher values will imply greater performance, but will increase delay
+ *         before record appears in the log file.
+ *     </li>
+ *     <li>
+ *         {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data
+ *         node to which client is connected. If {@code true}, file will not be distributed and will be written
+ *         to a single data node. Default value is {@code true}.
+ *     </li>
+ *     <li>
+ *         {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to
+ *         local data node if it has enough free space. After some time it can be redistributed across nodes though.
+ *     </li>
+ * </ul>
+ * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in
+ * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}.
+ * <p>
+ * Sample configuration that can be placed to {@code core-site.xml} file:
+ * <pre name="code" class="xml">
+ *     &lt;property&gt;
+ *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.enabled&lt;/name&gt;
+ *         &lt;value&gt;true&lt;/value&gt;
+ *     &lt;/property&gt;
+ *     &lt;property&gt;
+ *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.dir&lt;/name&gt;
+ *         &lt;value&gt;/home/apache/ignite/log/sampling&lt;/value&gt;
+ *     &lt;/property&gt;
+ *     &lt;property&gt;
+ *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.batch_size&lt;/name&gt;
+ *         &lt;value&gt;16&lt;/value&gt;
+ *     &lt;/property&gt;
+ * </pre>
+ * Parameters could also be specified per mapreduce job, e.g.
+ * <pre name="code" class="bash">
+ * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4
+ * </pre>
+ * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest
+ * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}.
+ */
+public class HadoopParameters {
+    /** Parameter name for control over file colocation write mode. */
+    public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes";
+
+    /** Parameter name for custom sequential reads before prefetch value. */
+    public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH =
+        "fs.igfs.%s.open.sequential_reads_before_prefetch";
+
+    /** Parameter name for client logger directory. */
+    public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir";
+
+    /** Parameter name for log batch size. */
+    public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size";
+
+    /** Parameter name for log enabled flag. */
+    public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled";
+
+    /** Parameter name for prefer local writes flag. */
+    public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes";
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
new file mode 100644
index 0000000..e5ec3f7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+
+/**
+ * Local file system implementation for Hadoop.
+ */
+public class HadoopRawLocalFileSystem extends FileSystem {
+    /** Working directory for each thread. */
+    private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
+        @Override protected Path initialValue() {
+            return getInitialWorkingDirectory();
+        }
+    };
+
+    /**
+     * Converts Hadoop path to local path.
+     *
+     * @param path Hadoop path.
+     * @return Local path.
+     */
+    File convert(Path path) {
+        checkPath(path);
+
+        if (path.isAbsolute())
+            return new File(path.toUri().getPath());
+
+        return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getHomeDirectory() {
+        return makeQualified(new Path(System.getProperty("user.home")));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getInitialWorkingDirectory() {
+        File f = new File(System.getProperty("user.dir"));
+
+        return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(URI uri, Configuration conf) throws IOException {
+        super.initialize(uri, conf);
+
+        setConf(conf);
+
+        String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
+
+        if (initWorkDir != null)
+            setWorkingDirectory(new Path(initWorkDir));
+    }
+
+    /** {@inheritDoc} */
+    @Override public URI getUri() {
+        return FsConstants.LOCAL_FS_URI;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+        return new FSDataInputStream(new InStream(checkExists(convert(f))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
+        short replication, long blockSize, Progressable progress) throws IOException {
+        File file = convert(f);
+
+        if (!overwrite && !file.createNewFile())
+            throw new IOException("Failed to create new file: " + f.toUri());
+
+        return out(file, false, bufSize);
+    }
+
+    /**
+     * @param file File.
+     * @param append Append flag.
+     * @return Output stream.
+     * @throws IOException If failed.
+     */
+    private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
+        return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
+            bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
+        return out(convert(f), true, bufSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean rename(Path src, Path dst) throws IOException {
+        return convert(src).renameTo(convert(dst));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(Path f, boolean recursive) throws IOException {
+        File file = convert(f);
+
+        if (file.isDirectory() && !recursive)
+            throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
+
+        return U.delete(file);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setWorkingDirectory(Path dir) {
+        workDir.set(fixRelativePart(dir));
+
+        checkPath(dir);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getWorkingDirectory() {
+        return workDir.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+        if(f == null)
+            throw new IllegalArgumentException("mkdirs path arg is null");
+
+        Path parent = f.getParent();
+
+        File p2f = convert(f);
+
+        if(parent != null) {
+            File parent2f = convert(parent);
+
+            if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
+                throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
+
+        }
+
+        return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus getFileStatus(Path f) throws IOException {
+        return fileStatus(checkExists(convert(f)));
+    }
+
+    /**
+     * @return File status.
+     */
+    private FileStatus fileStatus(File file) throws IOException {
+        boolean dir = file.isDirectory();
+
+        java.nio.file.Path path = dir ? null : file.toPath();
+
+        return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
+            /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
+            new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
+    }
+
+    /**
+     * @param file File.
+     * @return Same file.
+     * @throws FileNotFoundException If does not exist.
+     */
+    private static File checkExists(File file) throws FileNotFoundException {
+        if (!file.exists())
+            throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
+
+        return file;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus[] listStatus(Path f) throws IOException {
+        File file = convert(f);
+
+        if (checkExists(file).isFile())
+            return new FileStatus[] {fileStatus(file)};
+
+        File[] files = file.listFiles();
+
+        FileStatus[] res = new FileStatus[files.length];
+
+        for (int i = 0; i < res.length; i++)
+            res[i] = fileStatus(files[i]);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean supportsSymlinks() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
+        Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
+        return getFileStatus(getLinkTarget(f));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getLinkTarget(Path f) throws IOException {
+        File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
+
+        return new Path(file.toURI());
+    }
+
+    /**
+     * Input stream.
+     */
+    private static class InStream extends InputStream implements Seekable, PositionedReadable {
+        /** */
+        private final RandomAccessFile file;
+
+        /**
+         * @param f File.
+         * @throws IOException If failed.
+         */
+        public InStream(File f) throws IOException {
+            file = new RandomAccessFile(f, "r");
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized int read() throws IOException {
+            return file.read();
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
+            return file.read(b, off, len);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void close() throws IOException {
+            file.close();
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
+            long pos0 = file.getFilePointer();
+
+            file.seek(pos);
+            int res = file.read(buf, off, len);
+
+            file.seek(pos0);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+            if (read(pos, buf, off, len) != len)
+                throw new IOException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFully(long pos, byte[] buf) throws IOException {
+            readFully(pos, buf, 0, buf.length);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void seek(long pos) throws IOException {
+            file.seek(pos);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized long getPos() throws IOException {
+            return file.getFilePointer();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean seekToNewSource(long targetPos) throws IOException {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
new file mode 100644
index 0000000..b3cb235
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Facade for communication with grid.
+ */
+public interface HadoopIgfs {
+    /**
+     * Perform handshake.
+     *
+     * @param logDir Log directory.
+     * @return Future with handshake result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException;
+
+    /**
+     * Close connection.
+     *
+     * @param force Force flag.
+     */
+    public void close(boolean force);
+
+    /**
+     * Command to retrieve file info for some IGFS path.
+     *
+     * @param path Path to get file info for.
+     * @return Future for info operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to update file properties.
+     *
+     * @param path IGFS path to update properties.
+     * @param props Properties to update.
+     * @return Future for update operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
+
+    /**
+     * Sets last access time and last modification time for a file.
+     *
+     * @param path Path to update times.
+     * @param accessTime Last access time to set.
+     * @param modificationTime Last modification time to set.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Command to rename given path.
+     *
+     * @param src Source path.
+     * @param dest Destination path.
+     * @return Future for rename operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to delete given path.
+     *
+     * @param path Path to delete.
+     * @param recursive {@code True} if deletion is recursive.
+     * @return Future for delete operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to get affinity for given path, offset and length.
+     *
+     * @param path Path to get affinity for.
+     * @param start Start position (offset).
+     * @param len Data length.
+     * @return Future for affinity command.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Gets path summary.
+     *
+     * @param path Path to get summary for.
+     * @return Future that will be completed when summary is received.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to create directories.
+     *
+     * @param path Path to create.
+     * @return Future for mkdirs operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to get list of files in directory.
+     *
+     * @param path Path to list.
+     * @return Future for listFiles operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to get directory listing.
+     *
+     * @param path Path to list.
+     * @return Future for listPaths operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException;
+
+    /**
+     * Performs status request.
+     *
+     * @return Status response.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsStatus fsStatus() throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to open file for reading.
+     *
+     * @param path File path to open.
+     * @return Future for open operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to open file for reading.
+     *
+     * @param path File path to open.
+     * @return Future for open operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Command to create file and open it for output.
+     *
+     * @param path Path to file.
+     * @param overwrite If {@code true} then old file contents will be lost.
+     * @param colocate If {@code true} and called on data node, file will be written on that node.
+     * @param replication Replication factor.
+     * @param props File properties for creation.
+     * @return Stream descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
+        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
+
+    /**
+     * Open file for output appending data to the end of a file.
+     *
+     * @param path Path to file.
+     * @param create If {@code true}, file will be created if does not exist.
+     * @param props File properties.
+     * @return Stream descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
+        @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
new file mode 100644
index 0000000..ff69478
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.*;
+
+/**
+ * Communication exception indicating a problem between file system and IGFS instance.
+ */
+public class HadoopIgfsCommunicationException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates new exception with given throwable as a nested cause and
+     * source of error message.
+     *
+     * @param cause Non-null throwable cause.
+     */
+    public HadoopIgfsCommunicationException(Exception cause) {
+        super(cause);
+    }
+
+    /**
+     * Creates a new exception with given error message and optional nested cause exception.
+     *
+     * @param msg Error message.
+     */
+    public HadoopIgfsCommunicationException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates a new exception with given error message and optional nested cause exception.
+     *
+     * @param msg Error message.
+     * @param cause Cause.
+     */
+    public HadoopIgfsCommunicationException(String msg, Exception cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
new file mode 100644
index 0000000..7502f57
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+
+/**
+ * IGFS endpoint abstraction.
+ */
+public class HadoopIgfsEndpoint {
+    /** Localhost. */
+    public static final String LOCALHOST = "127.0.0.1";
+
+    /** IGFS name. */
+    private final String igfsName;
+
+    /** Grid name. */
+    private final String gridName;
+
+    /** Host. */
+    private final String host;
+
+    /** Port. */
+    private final int port;
+
+    /**
+     * Normalize IGFS URI.
+     *
+     * @param uri URI.
+     * @return Normalized URI.
+     * @throws IOException If failed.
+     */
+    public static URI normalize(URI uri) throws IOException {
+        try {
+            if (!F.eq(IgniteFileSystem.IGFS_SCHEME, uri.getScheme()))
+                throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri);
+
+            HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(uri.getAuthority());
+
+            StringBuilder sb = new StringBuilder();
+
+            if (endpoint.igfs() != null)
+                sb.append(endpoint.igfs());
+
+            if (endpoint.grid() != null)
+                sb.append(":").append(endpoint.grid());
+
+            return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(),
+                uri.getPath(), uri.getQuery(), uri.getFragment());
+        }
+        catch (URISyntaxException | IgniteCheckedException e) {
+            throw new IOException("Failed to normalize URI: " + uri, e);
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param connStr Connection string.
+     * @throws IgniteCheckedException If failed to parse connection string.
+     */
+    public HadoopIgfsEndpoint(@Nullable String connStr) throws IgniteCheckedException {
+        if (connStr == null)
+            connStr = "";
+
+        String[] tokens = connStr.split("@", -1);
+
+        IgniteBiTuple<String, Integer> hostPort;
+
+        if (tokens.length == 1) {
+            igfsName = null;
+            gridName = null;
+
+            hostPort = hostPort(connStr, connStr);
+        }
+        else if (tokens.length == 2) {
+            String authStr = tokens[0];
+
+            if (authStr.isEmpty()) {
+                gridName = null;
+                igfsName = null;
+            }
+            else {
+                String[] authTokens = authStr.split(":", -1);
+
+                igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0];
+
+                if (authTokens.length == 1)
+                    gridName = null;
+                else if (authTokens.length == 2)
+                    gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1];
+                else
+                    throw new IgniteCheckedException("Invalid connection string format: " + connStr);
+            }
+
+            hostPort = hostPort(connStr, tokens[1]);
+        }
+        else
+            throw new IgniteCheckedException("Invalid connection string format: " + connStr);
+
+        host = hostPort.get1();
+
+        assert hostPort.get2() != null;
+
+        port = hostPort.get2();
+    }
+
+    /**
+     * Parse host and port.
+     *
+     * @param connStr Full connection string.
+     * @param hostPortStr Host/port connection string part.
+     * @return Tuple with host and port.
+     * @throws IgniteCheckedException If failed to parse connection string.
+     */
+    private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException {
+        String[] tokens = hostPortStr.split(":", -1);
+
+        String host = tokens[0];
+
+        if (F.isEmpty(host))
+            host = LOCALHOST;
+
+        int port;
+
+        if (tokens.length == 1)
+            port = DFLT_IPC_PORT;
+        else if (tokens.length == 2) {
+            String portStr = tokens[1];
+
+            try {
+                port = Integer.valueOf(portStr);
+
+                if (port < 0 || port > 65535)
+                    throw new IgniteCheckedException("Invalid port number: " + connStr);
+            }
+            catch (NumberFormatException e) {
+                throw new IgniteCheckedException("Invalid port number: " + connStr);
+            }
+        }
+        else
+            throw new IgniteCheckedException("Invalid connection string format: " + connStr);
+
+        return F.t(host, port);
+    }
+
+    /**
+     * @return IGFS name.
+     */
+    @Nullable public String igfs() {
+        return igfsName;
+    }
+
+    /**
+     * @return Grid name.
+     */
+    @Nullable public String grid() {
+        return gridName;
+    }
+
+    /**
+     * @return Host.
+     */
+    public String host() {
+        return host;
+    }
+
+    /**
+     * @return Host.
+     */
+    public boolean isLocal() {
+        return F.eq(LOCALHOST, host);
+    }
+
+    /**
+     * @return Port.
+     */
+    public int port() {
+        return port;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopIgfsEndpoint.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
new file mode 100644
index 0000000..2200e78
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Extended IGFS server interface.
+ */
+public interface HadoopIgfsEx extends HadoopIgfs {
+    /**
+     * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
+     * If connection is closed already, callback will be invoked synchronously inside this method.
+     *
+     * @param delegate Stream delegate.
+     * @param lsnr Event listener.
+     */
+    public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr);
+
+    /**
+     * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
+     *
+     * @param delegate Stream delegate.
+     */
+    public void removeEventListener(HadoopIgfsStreamDelegate delegate);
+
+    /**
+     * Asynchronously reads specified amount of bytes from opened input stream.
+     *
+     * @param delegate Stream delegate.
+     * @param pos Position to read from.
+     * @param len Data length to read.
+     * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining
+     *     bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will
+     *     be the result of read future.
+     * @param outOff Output offset.
+     * @param outLen Output length.
+     * @return Read data.
+     */
+    public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
+        @Nullable final byte[] outBuf, final int outOff, final int outLen);
+
+    /**
+     * Writes data to the stream with given streamId. This method does not return any future since
+     * no response to write request is sent.
+     *
+     * @param delegate Stream delegate.
+     * @param data Data to write.
+     * @param off Offset.
+     * @param len Length.
+     * @throws IOException If failed.
+     */
+    public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException;
+
+    /**
+     * Close server stream.
+     *
+     * @param delegate Stream delegate.
+     * @throws IOException If failed.
+     */
+    public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+    /**
+     * Flush output stream.
+     *
+     * @param delegate Stream delegate.
+     * @throws IOException If failed.
+     */
+    public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
new file mode 100644
index 0000000..59a8f49
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * IGFS client future that holds response parse closure.
+ */
+public class HadoopIgfsFuture<T> extends GridPlainFutureAdapter<T> {
+    /** Output buffer. */
+    private byte[] outBuf;
+
+    /** Output offset. */
+    private int outOff;
+
+    /** Output length. */
+    private int outLen;
+
+    /** Read future flag. */
+    private boolean read;
+
+    /**
+     * @return Output buffer.
+     */
+    public byte[] outputBuffer() {
+        return outBuf;
+    }
+
+    /**
+     * @param outBuf Output buffer.
+     */
+    public void outputBuffer(@Nullable byte[] outBuf) {
+        this.outBuf = outBuf;
+    }
+
+    /**
+     * @return Offset in output buffer to write from.
+     */
+    public int outputOffset() {
+        return outOff;
+    }
+
+    /**
+     * @param outOff Offset in output buffer to write from.
+     */
+    public void outputOffset(int outOff) {
+        this.outOff = outOff;
+    }
+
+    /**
+     * @return Length to write to output buffer.
+     */
+    public int outputLength() {
+        return outLen;
+    }
+
+    /**
+     * @param outLen Length to write to output buffer.
+     */
+    public void outputLength(int outLen) {
+        this.outLen = outLen;
+    }
+
+    /**
+     * @param read {@code True} if this is a read future.
+     */
+    public void read(boolean read) {
+        this.read = read;
+    }
+
+    /**
+     * @return {@code True} if this is a read future.
+     */
+    public boolean read() {
+        return read;
+    }
+}


Mime
View raw message