ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [45/92] [abbrv] [partial] ignite git commit: Moving classes around.
Date Wed, 21 Sep 2016 14:53:37 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopLongCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopLongCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopLongCounter.java
new file mode 100644
index 0000000..0d61e0d
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopLongCounter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Standard hadoop counter to use via original Hadoop API in Hadoop jobs.
+ */
+public class HadoopLongCounter extends HadoopCounterAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The counter value. */
+    private long val;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopLongCounter() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param grp Group name.
+     * @param name Counter name.
+     */
+    public HadoopLongCounter(String grp, String name) {
+        super(grp, name);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeValue(ObjectOutput out) throws IOException {
+        out.writeLong(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readValue(ObjectInput in) throws IOException {
+        val = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void merge(HadoopCounter cntr) {
+        val += ((HadoopLongCounter)cntr).val;
+    }
+
+    /**
+     * Gets current value of this counter.
+     *
+     * @return Current value.
+     */
+    public long value() {
+        return val;
+    }
+
+    /**
+     * Sets current value by the given value.
+     *
+     * @param val Value to set.
+     */
+    public void value(long val) {
+        this.val = val;
+    }
+
+    /**
+     * Increment this counter by the given value.
+     *
+     * @param i Value to increase this counter by.
+     */
+    public void increment(long i) {
+        val += i;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopPerformanceCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopPerformanceCounter.java
new file mode 100644
index 0000000..b35b7fb
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/counter/HadoopPerformanceCounter.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.JOB_SUBMISSION_START_TS_PROPERTY;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.REQ_NEW_JOBID_TS_PROPERTY;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.RESPONSE_NEW_JOBID_TS_PROPERTY;
+
+/**
+ * Counter for the job statistics accumulation.
+ */
+public class HadoopPerformanceCounter extends HadoopCounterAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The group name for this counter. */
+    private static final String GROUP_NAME = "SYSTEM";
+
+    /** The counter name for this counter. */
+    private static final String COUNTER_NAME = "PERFORMANCE";
+
+    /** Events collections. */
+    private Collection<T2<String,Long>> evts = new ArrayList<>();
+
+    /** Node id to insert into the event info. */
+    private UUID nodeId;
+
+    /** */
+    private int reducerNum;
+
+    /** */
+    private volatile Long firstShuffleMsg;
+
+    /** */
+    private volatile Long lastShuffleMsg;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopPerformanceCounter() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param grp Group name.
+     * @param name Counter name.
+     */
+    public HadoopPerformanceCounter(String grp, String name) {
+        super(grp, name);
+    }
+
+    /**
+     * Constructor to create instance to use this as helper.
+     *
+     * @param nodeId Id of the work node.
+     */
+    public HadoopPerformanceCounter(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeValue(ObjectOutput out) throws IOException {
+        U.writeCollection(out, evts);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readValue(ObjectInput in) throws IOException {
+        try {
+            evts = U.readCollection(in);
+        }
+        catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void merge(HadoopCounter cntr) {
+        evts.addAll(((HadoopPerformanceCounter)cntr).evts);
+    }
+
+    /**
+     * Gets the events collection.
+     *
+     * @return Collection of event.
+     */
+    public Collection<T2<String, Long>> evts() {
+        return evts;
+    }
+
+    /**
+     * Generate name that consists of some event information.
+     *
+     * @param info Task info.
+     * @param evtType The type of the event.
+     * @return String contains necessary event information.
+     */
+    private String eventName(HadoopTaskInfo info, String evtType) {
+        return eventName(info.type().toString(), info.taskNumber(), evtType);
+    }
+
+    /**
+     * Generate name that consists of some event information.
+     *
+     * @param taskType Task type.
+     * @param taskNum Number of the task.
+     * @param evtType The type of the event.
+     * @return String contains necessary event information.
+     */
+    private String eventName(String taskType, int taskNum, String evtType) {
+        assert nodeId != null;
+
+        return taskType + " " + taskNum + " " + evtType + " " + nodeId;
+    }
+
+    /**
+     * Adds event of the task submission (task instance creation).
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskSubmit(HadoopTaskInfo info, long ts) {
+        evts.add(new T2<>(eventName(info, "submit"), ts));
+    }
+
+    /**
+     * Adds event of the task preparation.
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskPrepare(HadoopTaskInfo info, long ts) {
+        evts.add(new T2<>(eventName(info, "prepare"), ts));
+    }
+
+    /**
+     * Adds event of the task finish.
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskFinish(HadoopTaskInfo info, long ts) {
+        if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) {
+            evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
+            evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
+
+            lastShuffleMsg = null;
+        }
+
+        evts.add(new T2<>(eventName(info, "finish"), ts));
+    }
+
+    /**
+     * Adds event of the task run.
+     *
+     * @param info Task info.
+     * @param ts Timestamp of the event.
+     */
+    public void onTaskStart(HadoopTaskInfo info, long ts) {
+        evts.add(new T2<>(eventName(info, "start"), ts));
+    }
+
+    /**
+     * Adds event of the job preparation.
+     *
+     * @param ts Timestamp of the event.
+     */
+    public void onJobPrepare(long ts) {
+        assert nodeId != null;
+
+        evts.add(new T2<>("JOB prepare " + nodeId, ts));
+    }
+
+    /**
+     * Adds event of the job start.
+     *
+     * @param ts Timestamp of the event.
+     */
+    public void onJobStart(long ts) {
+        assert nodeId != null;
+
+        evts.add(new T2<>("JOB start " + nodeId, ts));
+    }
+
+    /**
+     * Adds client submission events from job info.
+     *
+     * @param info Job info.
+     */
+    public void clientSubmissionEvents(HadoopJobInfo info) {
+        assert nodeId != null;
+
+        addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
+        addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY);
+        addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY);
+    }
+
+    /**
+     * Adds event with timestamp from some property in job info.
+     *
+     * @param evt Event type and phase.
+     * @param info Job info.
+     * @param propName Property name to get timestamp.
+     */
+    private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) {
+        String val = info.property(propName);
+
+        if (!F.isEmpty(val)) {
+            try {
+                evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val)));
+            }
+            catch (NumberFormatException e) {
+                throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e);
+            }
+        }
+    }
+
+    /**
+     * Registers shuffle message event.
+     *
+     * @param reducerNum Number of reducer that receives the data.
+     * @param ts Timestamp of the event.
+     */
+    public void onShuffleMessage(int reducerNum, long ts) {
+        this.reducerNum = reducerNum;
+
+        if (firstShuffleMsg == null)
+            firstShuffleMsg = ts;
+
+        lastShuffleMsg = ts;
+    }
+
+    /**
+     * Gets system predefined performance counter from the HadoopCounters object.
+     *
+     * @param cntrs HadoopCounters object.
+     * @param nodeId Node id for methods that adds events. It may be null if you don't use ones.
+     * @return Predefined performance counter.
+     */
+    public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) {
+        HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
+
+        if (nodeId != null)
+            cntr.nodeId(nodeId);
+
+        return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
+    }
+
+    /**
+     * Sets the nodeId field.
+     *
+     * @param nodeId Node id.
+     */
+    private void nodeId(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..e348820
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.hadoop.util.UserNameMapper;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Arrays;
+
+/**
+ * Basic Hadoop file system factory delegate.
+ */
+public class HadoopBasicFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate {
+    /** Proxy. */
+    protected final HadoopFileSystemFactory proxy;
+
+    /** Configuration of the secondary filesystem, never null. */
+    protected Configuration cfg;
+
+    /** Resulting URI. */
+    protected URI fullUri;
+
+    /** User name mapper. */
+    private UserNameMapper usrNameMapper;
+
+    /**
+     * Constructor.
+     *
+     * @param proxy Proxy.
+     */
+    public HadoopBasicFileSystemFactoryDelegate(BasicHadoopFileSystemFactory proxy) {
+        this.proxy = proxy;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileSystem get(String name) throws IOException {
+        String name0 = IgfsUtils.fixUserName(name);
+
+        if (usrNameMapper != null)
+            name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0));
+
+        return getWithMappedName(name0);
+    }
+
+    /**
+     * Internal file system create routine.
+     *
+     * @param usrName User name.
+     * @return File system.
+     * @throws IOException If failed.
+     */
+    protected FileSystem getWithMappedName(String usrName) throws IOException {
+        assert cfg != null;
+
+        try {
+            // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation.
+            // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context
+            // classloader to classloader of current class to avoid strange class-cast-exceptions.
+            ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
+
+            try {
+                return create(usrName);
+            }
+            finally {
+                HadoopUtils.restoreContextClassLoader(oldLdr);
+            }
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IOException("Failed to create file system due to interrupt.", e);
+        }
+    }
+
+    /**
+     * Internal file system creation routine, invoked in correct class loader context.
+     *
+     * @param usrName User name.
+     * @return File system.
+     * @throws IOException If failed.
+     * @throws InterruptedException if the current thread is interrupted.
+     */
+    protected FileSystem create(String usrName) throws IOException, InterruptedException {
+        return FileSystem.get(fullUri, cfg, usrName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        BasicHadoopFileSystemFactory proxy0 = (BasicHadoopFileSystemFactory)proxy;
+
+        cfg = HadoopUtils.safeCreateConfiguration();
+
+        if (proxy0.getConfigPaths() != null) {
+            for (String cfgPath : proxy0.getConfigPaths()) {
+                if (cfgPath == null)
+                    throw new NullPointerException("Configuration path cannot be null: " +
+                        Arrays.toString(proxy0.getConfigPaths()));
+                else {
+                    URL url = U.resolveIgniteUrl(cfgPath);
+
+                    if (url == null) {
+                        // If secConfPath is given, it should be resolvable:
+                        throw new IgniteException("Failed to resolve secondary file system configuration path " +
+                            "(ensure that it exists locally and you have read access to it): " + cfgPath);
+                    }
+
+                    cfg.addResource(url);
+                }
+            }
+        }
+
+        // If secondary fs URI is not given explicitly, try to get it from the configuration:
+        if (proxy0.getUri() == null)
+            fullUri = FileSystem.getDefaultUri(cfg);
+        else {
+            try {
+                fullUri = new URI(proxy0.getUri());
+            }
+            catch (URISyntaxException use) {
+                throw new IgniteException("Failed to resolve secondary file system URI: " + proxy0.getUri());
+            }
+        }
+
+        usrNameMapper = proxy0.getUserNameMapper();
+
+        if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
+            ((LifecycleAware)usrNameMapper).start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
+            ((LifecycleAware)usrNameMapper).stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..04bbeb8
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
+
+import java.io.IOException;
+
+/**
+ * Caching Hadoop file system factory delegate.
+ */
+public class HadoopCachingFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate {
+    /** Per-user file system cache. */
+    private final HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
+        new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
+            @Override public FileSystem createValue(String key) throws IOException {
+                return HadoopCachingFileSystemFactoryDelegate.super.getWithMappedName(key);
+            }
+        }
+    );
+
+    /**
+     * Constructor.
+     *
+     * @param proxy Proxy.
+     */
+    public HadoopCachingFileSystemFactoryDelegate(CachingHadoopFileSystemFactory proxy) {
+        super(proxy);
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileSystem getWithMappedName(String name) throws IOException {
+        return cache.getOrCreate(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        super.start();
+
+        // Disable caching.
+        cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        super.stop();
+
+        try {
+            cache.close();
+        }
+        catch (IgniteCheckedException ice) {
+            throw new IgniteException(ice);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..3eb6239
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+import java.io.IOException;
+
+/**
+ * Hadoop file system factory delegate for non-standard factories.
+ */
+public class HadoopDefaultFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate {
+    /** Factory. */
+    private final HadoopFileSystemFactory factory;
+
+    /**
+     * Constructor.
+     *
+     * @param factory Factory.
+     */
+    public HadoopDefaultFileSystemFactoryDelegate(HadoopFileSystemFactory factory) {
+        assert factory != null;
+
+        this.factory = factory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileSystem get(String usrName) throws IOException {
+        return (FileSystem)factory.get(usrName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        if (factory instanceof LifecycleAware)
+            ((LifecycleAware)factory).start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        if (factory instanceof LifecycleAware)
+            ((LifecycleAware)factory).stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
new file mode 100644
index 0000000..ae1b244
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.T2;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+/**
+ * Counter writer delegate implementation.
+ */
+@SuppressWarnings("unused")
+public class HadoopFileSystemCounterWriterDelegateImpl implements HadoopFileSystemCounterWriterDelegate {
+    /** */
+    private static final String USER_MACRO = "${USER}";
+
+    /** */
+    private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO;
+
+    /**
+     * Constructor.
+     *
+     * @param proxy Proxy (not used).
+     */
+    public HadoopFileSystemCounterWriterDelegateImpl(IgniteHadoopFileSystemCounterWriter proxy) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException {
+        Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
+
+        final HadoopJobInfo jobInfo = job.info();
+
+        final HadoopJobId jobId = job.id();
+
+        for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
+            hadoopCfg.set(e.getKey(), e.getValue());
+
+        String user = jobInfo.user();
+
+        user = IgfsUtils.fixUserName(user);
+
+        String dir = jobInfo.property(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY);
+
+        if (dir == null)
+            dir = DEFAULT_COUNTER_WRITER_DIR;
+
+        Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString());
+
+        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
+
+        try {
+            hadoopCfg.set(MRJobConfig.USER_NAME, user);
+
+            FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg);
+
+            fs.mkdirs(jobStatPath);
+
+            try (PrintStream out = new PrintStream(fs.create(
+                new Path(jobStatPath, IgniteHadoopFileSystemCounterWriter.PERFORMANCE_COUNTER_FILE_NAME)))) {
+                for (T2<String, Long> evt : perfCntr.evts()) {
+                    out.print(evt.get1());
+                    out.print(':');
+                    out.println(evt.get2().toString());
+                }
+
+                out.flush();
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
new file mode 100644
index 0000000..7f7100d
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
+import org.apache.ignite.igfs.IgfsException;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
+import org.apache.ignite.igfs.IgfsPathNotFoundException;
+import org.apache.ignite.igfs.IgfsUserContext;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Secondary file system implementation.
+ */
+@SuppressWarnings("unused")
+public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSecondaryFileSystemDelegate {
+    /** The default user name. It is used if no user context is set. */
+    private final String dfltUsrName;
+
+    /** Factory. */
+    private final HadoopFileSystemFactoryDelegate factory;
+
+    /**
+     * Constructor.
+     *
+     * @param proxy Proxy.
+    */
+    public HadoopIgfsSecondaryFileSystemDelegateImpl(IgniteHadoopIgfsSecondaryFileSystem proxy) {
+        assert proxy.getFileSystemFactory() != null;
+
+        dfltUsrName = IgfsUtils.fixUserName(proxy.getDefaultUserName());
+
+        HadoopFileSystemFactory factory0 = proxy.getFileSystemFactory();
+
+        if (factory0 == null)
+            factory0 = new CachingHadoopFileSystemFactory();
+
+        factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exists(IgfsPath path) {
+        try {
+            return fileSystemForUser().exists(convert(path));
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
+        HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
+
+        final FileSystem fileSys = fileSystemForUser();
+
+        try {
+            if (props0.userName() != null || props0.groupName() != null)
+                fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
+
+            if (props0.permission() != null)
+                fileSys.setPermission(convert(path), props0.permission());
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
+        }
+
+        //Result is not used in case of secondary FS.
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rename(IgfsPath src, IgfsPath dest) {
+        // Delegate to the secondary file system.
+        try {
+            if (!fileSystemForUser().rename(convert(src), convert(dest)))
+                throw new IgfsException("Failed to rename (secondary file system returned false) " +
+                    "[src=" + src + ", dest=" + dest + ']');
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(IgfsPath path, boolean recursive) {
+        try {
+            return fileSystemForUser().delete(convert(path), recursive);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path) {
+        try {
+            if (!fileSystemForUser().mkdirs(convert(path)))
+                throw new IgniteException("Failed to make directories [path=" + path + "]");
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
+        try {
+            if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
+                throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
+        try {
+            FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
+
+            if (statuses == null)
+                throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
+
+            Collection<IgfsPath> res = new ArrayList<>(statuses.length);
+
+            for (FileStatus status : statuses)
+                res.add(new IgfsPath(path, status.getPath().getName()));
+
+            return res;
+        }
+        catch (FileNotFoundException ignored) {
+            throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
+        try {
+            FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
+
+            if (statuses == null)
+                throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
+
+            Collection<IgfsFile> res = new ArrayList<>(statuses.length);
+
+            for (FileStatus s : statuses) {
+                IgfsEntryInfo fsInfo = s.isDirectory() ?
+                    IgfsUtils.createDirectory(
+                        IgniteUuid.randomUuid(),
+                        null,
+                        properties(s),
+                        s.getAccessTime(),
+                        s.getModificationTime()
+                    ) :
+                    IgfsUtils.createFile(
+                        IgniteUuid.randomUuid(),
+                        (int)s.getBlockSize(),
+                        s.getLen(),
+                        null,
+                        null,
+                        false,
+                        properties(s),
+                        s.getAccessTime(),
+                        s.getModificationTime()
+                    );
+
+                res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1));
+            }
+
+            return res;
+        }
+        catch (FileNotFoundException ignored) {
+            throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
+        return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream create(IgfsPath path, boolean overwrite) {
+        try {
+            return fileSystemForUser().create(convert(path), overwrite);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
+        long blockSize, @Nullable Map<String, String> props) {
+        HadoopIgfsProperties props0 =
+            new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
+
+        try {
+            return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize,
+                (short) replication, blockSize, null);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
+                ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
+                ", blockSize=" + blockSize + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
+        @Nullable Map<String, String> props) {
+        try {
+            return fileSystemForUser().append(convert(path), bufSize);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(final IgfsPath path) {
+        try {
+            final FileStatus status = fileSystemForUser().getFileStatus(convert(path));
+
+            if (status == null)
+                return null;
+
+            final Map<String, String> props = properties(status);
+
+            return new IgfsFile() {
+                @Override public IgfsPath path() {
+                    return path;
+                }
+
+                @Override public boolean isFile() {
+                    return status.isFile();
+                }
+
+                @Override public boolean isDirectory() {
+                    return status.isDirectory();
+                }
+
+                @Override public int blockSize() {
+                    // By convention directory has blockSize == 0, while file has blockSize > 0:
+                    return isDirectory() ? 0 : (int)status.getBlockSize();
+                }
+
+                @Override public long groupBlockSize() {
+                    return status.getBlockSize();
+                }
+
+                @Override public long accessTime() {
+                    return status.getAccessTime();
+                }
+
+                @Override public long modificationTime() {
+                    return status.getModificationTime();
+                }
+
+                @Override public String property(String name) throws IllegalArgumentException {
+                    String val = props.get(name);
+
+                    if (val ==  null)
+                        throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
+
+                    return val;
+                }
+
+                @Nullable @Override public String property(String name, @Nullable String dfltVal) {
+                    String val = props.get(name);
+
+                    return val == null ? dfltVal : val;
+                }
+
+                @Override public long length() {
+                    return status.getLen();
+                }
+
+                /** {@inheritDoc} */
+                @Override public Map<String, String> properties() {
+                    return props;
+                }
+            };
+        }
+        catch (FileNotFoundException ignore) {
+            return null;
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long usedSpaceSize() {
+        try {
+            // We don't use FileSystem#getUsed() since it counts only the files
+            // in the filesystem root, not all the files recursively.
+            return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed();
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
+        try {
+            // We don't use FileSystem#getUsed() since it counts only the files
+            // in the filesystem root, not all the files recursively.
+            fileSystemForUser().setTimes(convert(path), modificationTime, accessTime);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed set times for path: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    public void start() {
+        factory.start();
+    }
+
+    /** {@inheritDoc} */
+    public void stop() {
+        factory.stop();
+    }
+
+    /**
+     * Convert IGFS path into Hadoop path.
+     *
+     * @param path IGFS path.
+     * @return Hadoop path.
+     */
+    private Path convert(IgfsPath path) {
+        URI uri = fileSystemForUser().getUri();
+
+        return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
+    }
+
+    /**
+     * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
+     *
+     * @param e Exception to check.
+     * @param detailMsg Detailed error message.
+     * @return Appropriate exception.
+     */
+    private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
+        return cast(detailMsg, e);
+    }
+
+    /**
+     * Cast IO exception to IGFS exception.
+     *
+     * @param e IO exception.
+     * @return IGFS exception.
+     */
+    public static IgfsException cast(String msg, IOException e) {
+        if (e instanceof FileNotFoundException)
+            return new IgfsPathNotFoundException(e);
+        else if (e instanceof ParentNotDirectoryException)
+            return new IgfsParentNotDirectoryException(msg, e);
+        else if (e instanceof PathIsNotEmptyDirectoryException)
+            return new IgfsDirectoryNotEmptyException(e);
+        else if (e instanceof PathExistsException)
+            return new IgfsPathAlreadyExistsException(msg, e);
+        else
+            return new IgfsException(msg, e);
+    }
+
+    /**
+     * Convert Hadoop FileStatus properties to map.
+     *
+     * @param status File status.
+     * @return IGFS attributes.
+     */
+    private static Map<String, String> properties(FileStatus status) {
+        FsPermission perm = status.getPermission();
+
+        if (perm == null)
+            perm = FsPermission.getDefault();
+
+        HashMap<String, String> res = new HashMap<>(3);
+
+        res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort()));
+        res.put(IgfsUtils.PROP_USER_NAME, status.getOwner());
+        res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup());
+
+        return res;
+    }
+
+    /**
+     * Gets the FileSystem for the current context user.
+     * @return the FileSystem instance, never null.
+     */
+    private FileSystem fileSystemForUser() {
+        String user = IgfsUserContext.currentUser();
+
+        if (F.isEmpty(user))
+            user = IgfsUtils.fixUserName(dfltUsrName);
+
+        assert !F.isEmpty(user);
+
+        try {
+            return (FileSystem)factory.get(user);
+        }
+        catch (IOException ioe) {
+            throw new IgniteException(ioe);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopIgfsSecondaryFileSystemDelegateImpl.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..19c470e
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopKerberosFileSystemFactoryDelegate.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Kerberos Hadoop file system factory delegate.
+ */
+public class HadoopKerberosFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate {
+    /** The re-login interval. */
+    private long reloginInterval;
+
+    /** Time of last re-login attempt, in system milliseconds. */
+    private volatile long lastReloginTime;
+
+    /**
+     * Constructor.
+     *
+     * @param proxy Proxy.
+     */
+    public HadoopKerberosFileSystemFactoryDelegate(KerberosHadoopFileSystemFactory proxy) {
+        super(proxy);
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileSystem getWithMappedName(String name) throws IOException {
+        reloginIfNeeded();
+
+        return super.getWithMappedName(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FileSystem create(String usrName) throws IOException, InterruptedException {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName,
+            UserGroupInformation.getLoginUser());
+
+        return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+            @Override public FileSystem run() throws Exception {
+                return FileSystem.get(fullUri, cfg);
+            }
+        });
+    }
+
+    @Override public void start() throws IgniteException {
+        super.start();
+
+        KerberosHadoopFileSystemFactory proxy0 = (KerberosHadoopFileSystemFactory)proxy;
+
+        A.ensure(!F.isEmpty(proxy0.getKeyTab()), "keyTab cannot not be empty.");
+        A.ensure(!F.isEmpty(proxy0.getKeyTabPrincipal()), "keyTabPrincipal cannot not be empty.");
+        A.ensure(proxy0.getReloginInterval() >= 0, "reloginInterval cannot not be negative.");
+
+        reloginInterval = proxy0.getReloginInterval();
+
+        try {
+            UserGroupInformation.setConfiguration(cfg);
+            UserGroupInformation.loginUserFromKeytab(proxy0.getKeyTabPrincipal(), proxy0.getKeyTab());
+        }
+        catch (IOException ioe) {
+            throw new IgniteException("Failed login from keytab [keyTab=" + proxy0.getKeyTab() +
+                ", keyTabPrincipal=" + proxy0.getKeyTabPrincipal() + ']', ioe);
+        }
+    }
+
+    /**
+     * Re-logins the user if needed.
+     * First, the re-login interval defined in factory is checked. The re-login attempts will be not more
+     * frequent than one attempt per {@code reloginInterval}.
+     * Second, {@code UserGroupInformation.checkTGTAndReloginFromKeytab()} method invoked that gets existing
+     * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login.
+     *
+     * <p>This operation expected to be called upon each operation with the file system created with the factory.
+     * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there
+     * is no need to invoke it otherwise specially.
+     *
+     * @throws IOException If login fails.
+     */
+    private void reloginIfNeeded() throws IOException {
+        long now = System.currentTimeMillis();
+
+        if (now >= lastReloginTime + reloginInterval) {
+            UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+
+            lastReloginTime = now;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
new file mode 100644
index 0000000..1ecbee5
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.util.GridStringBuilder;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * File system cache utility methods used by Map-Reduce tasks and jobs.
+ */
+public class HadoopFileSystemCacheUtils {
+    /**
+     * A common static factory method. Creates new HadoopLazyConcurrentMap.
+     * @return a new HadoopLazyConcurrentMap.
+     */
+    public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
+        return new HadoopLazyConcurrentMap<>(
+            new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+                @Override public FileSystem createValue(FsCacheKey key) throws IOException {
+                    try {
+                        assert key != null;
+
+                        // Explicitly disable FileSystem caching:
+                        URI uri = key.uri();
+
+                        String scheme = uri.getScheme();
+
+                        // Copy the configuration to avoid altering the external object.
+                        Configuration cfg = new Configuration(key.configuration());
+
+                        String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
+
+                        cfg.setBoolean(prop, true);
+
+                        return FileSystem.get(uri, cfg, key.user());
+                    }
+                    catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+
+                        throw new IOException("Failed to create file system due to interrupt.", e);
+                    }
+                }
+            }
+        );
+    }
+
+    /**
+     * Gets non-null user name as per the Hadoop viewpoint.
+     * @param cfg the Hadoop job configuration, may be null.
+     * @return the user name, never null.
+     */
+    private static String getMrHadoopUser(Configuration cfg) throws IOException {
+        String user = cfg.get(MRJobConfig.USER_NAME);
+
+        if (user == null)
+            user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+        return user;
+    }
+
+    /**
+     * Common method to get the V1 file system in MapRed engine.
+     * It gets the filesystem for the user specified in the
+     * configuration with {@link MRJobConfig#USER_NAME} property.
+     * The file systems are created and cached in the given map upon first request.
+     *
+     * @param uri The file system uri.
+     * @param cfg The configuration.
+     * @param map The caching map.
+     * @return The file system.
+     * @throws IOException On error.
+     */
+    public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg,
+        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map)
+            throws IOException {
+        assert map != null;
+        assert cfg != null;
+
+        final String usr = getMrHadoopUser(cfg);
+
+        assert usr != null;
+
+        if (uri == null)
+            uri = FileSystem.getDefaultUri(cfg);
+
+        final FileSystem fs;
+
+        try {
+            final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+            fs = map.getOrCreate(key);
+        }
+        catch (IgniteException ie) {
+            throw new IOException(ie);
+        }
+
+        assert fs != null;
+        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+        return fs;
+    }
+
+    /**
+     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+     * @param uri0 The uri.
+     * @param cfg The cfg.
+     * @return Correct URI.
+     */
+    private static URI fixUri(URI uri0, Configuration cfg) {
+        if (uri0 == null)
+            return FileSystem.getDefaultUri(cfg);
+
+        String scheme = uri0.getScheme();
+        String authority = uri0.getAuthority();
+
+        if (authority == null) {
+            URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
+                return dfltUri;
+        }
+
+        return uri0;
+    }
+
+    /**
+     * Note that configuration is not a part of the key.
+     * It is used solely to initialize the first instance
+     * that is created for the key.
+     */
+    public static final class FsCacheKey {
+        /** */
+        private final URI uri;
+
+        /** */
+        private final String usr;
+
+        /** */
+        private final String equalityKey;
+
+        /** */
+        private final Configuration cfg;
+
+        /**
+         * Constructor
+         */
+        public FsCacheKey(URI uri, String usr, Configuration cfg) {
+            assert uri != null;
+            assert usr != null;
+            assert cfg != null;
+
+            this.uri = fixUri(uri, cfg);
+            this.usr = usr;
+            this.cfg = cfg;
+
+            this.equalityKey = createEqualityKey();
+        }
+
+        /**
+         * Creates String key used for equality and hashing.
+         */
+        private String createEqualityKey() {
+            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+            if (uri.getScheme() != null)
+                sb.a(uri.getScheme().toLowerCase());
+
+            sb.a("://");
+
+            if (uri.getAuthority() != null)
+                sb.a(uri.getAuthority().toLowerCase());
+
+            return sb.toString();
+        }
+
+        /**
+         * The URI.
+         */
+        public URI uri() {
+            return uri;
+        }
+
+        /**
+         * The User.
+         */
+        public String user() {
+            return usr;
+        }
+
+        /**
+         * The Configuration.
+         */
+        public Configuration configuration() {
+            return cfg;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
+        @Override public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return equalityKey.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return equalityKey;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
new file mode 100644
index 0000000..68c0dc4
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemsUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FsConstants;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utilities for configuring file systems to support the separate working directory per each thread.
+ */
+public class HadoopFileSystemsUtils {
+    /** Name of the property for setting working directory on create new local FS instance. */
+    public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
+
+    /**
+     * Setup wrappers of filesystems to support the separate working directory.
+     *
+     * @param cfg Config for setup.
+     */
+    public static void setupFileSystems(Configuration cfg) {
+        cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
+        cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
+                HadoopLocalFileSystemV2.class.getName());
+    }
+
+    /**
+     * Gets the property name to disable file system cache.
+     * @param scheme The file system URI scheme.
+     * @return The property name. If scheme is null,
+     * returns "fs.null.impl.disable.cache".
+     */
+    public static String disableFsCachePropertyName(@Nullable String scheme) {
+        return String.format("fs.%s.impl.disable.cache", scheme);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java
new file mode 100644
index 0000000..681cddb
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLazyConcurrentMap.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ * Maps values by keys.
+ * Values are created lazily using {@link ValueFactory}.
+ *
+ * Despite of the name, does not depend on any Hadoop classes.
+ */
+public class HadoopLazyConcurrentMap<K, V extends Closeable> {
+    /** The map storing the actual values. */
+    private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
+
+    /** The factory passed in by the client. Will be used for lazy value creation. */
+    private final ValueFactory<K, V> factory;
+
+    /** Lock used to close the objects. */
+    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+    /** Flag indicating that this map is closed and cleared. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     * @param factory the factory to create new values lazily.
+     */
+    public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
+        this.factory = factory;
+
+        assert getClass().getClassLoader() == Ignite.class.getClassLoader();
+    }
+
+    /**
+     * Gets cached or creates a new value of V.
+     * Never returns null.
+     * @param k the key to associate the value with.
+     * @return the cached or newly created value, never null.
+     * @throws IgniteException on error
+     */
+    public V getOrCreate(K k) {
+        ValueWrapper w = map.get(k);
+
+        if (w == null) {
+            closeLock.readLock().lock();
+
+            try {
+                if (closed)
+                    throw new IllegalStateException("Failed to create value for key [" + k
+                        + "]: the map is already closed.");
+
+                final ValueWrapper wNew = new ValueWrapper(k);
+
+                w = map.putIfAbsent(k, wNew);
+
+                if (w == null) {
+                    wNew.init();
+
+                    w = wNew;
+                }
+            }
+            finally {
+                closeLock.readLock().unlock();
+            }
+        }
+
+        try {
+            V v = w.getValue();
+
+            assert v != null;
+
+            return v;
+        }
+        catch (IgniteCheckedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    /**
+     * Clears the map and closes all the values.
+     */
+    public void close() throws IgniteCheckedException {
+        closeLock.writeLock().lock();
+
+        try {
+            if (closed)
+                return;
+
+            closed = true;
+
+            Exception err = null;
+
+            Set<K> keySet = map.keySet();
+
+            for (K key : keySet) {
+                V v = null;
+
+                try {
+                    v = map.get(key).getValue();
+                }
+                catch (IgniteCheckedException ignore) {
+                    // No-op.
+                }
+
+                if (v != null) {
+                    try {
+                        v.close();
+                    }
+                    catch (Exception err0) {
+                        if (err == null)
+                            err = err0;
+                    }
+                }
+            }
+
+            map.clear();
+
+            if (err != null)
+                throw new IgniteCheckedException(err);
+        }
+        finally {
+            closeLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Helper class that drives the lazy value creation.
+     */
+    private class ValueWrapper {
+        /** Future. */
+        private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
+
+        /** the key */
+        private final K key;
+
+        /**
+         * Creates new wrapper.
+         */
+        private ValueWrapper(K key) {
+            this.key = key;
+        }
+
+        /**
+         * Initializes the value using the factory.
+         */
+        private void init() {
+            try {
+                final V v0 = factory.createValue(key);
+
+                if (v0 == null)
+                    throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
+
+                fut.onDone(v0);
+            }
+            catch (Throwable e) {
+                fut.onDone(e);
+            }
+        }
+
+        /**
+         * Gets the available value or blocks until the value is initialized.
+         * @return the value, never null.
+         * @throws IgniteCheckedException on error.
+         */
+        V getValue() throws IgniteCheckedException {
+            return fut.get();
+        }
+    }
+
+    /**
+     * Interface representing the factory that creates map values.
+     * @param <K> the type of the key.
+     * @param <V> the type of the value.
+     */
+    public interface ValueFactory <K, V> {
+        /**
+         * Creates the new value. Should never return null.
+         *
+         * @param key the key to create value for
+         * @return the value.
+         * @throws IOException On failure.
+         */
+        public V createValue(K key) throws IOException;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java
new file mode 100644
index 0000000..cbb007f
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV1.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import java.io.File;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Local file system replacement for Hadoop jobs.
+ */
+public class HadoopLocalFileSystemV1 extends LocalFileSystem {
+    /**
+     * Creates new local file system.
+     */
+    public HadoopLocalFileSystemV1() {
+        super(new HadoopRawLocalFileSystem());
+    }
+
+    /** {@inheritDoc} */
+    @Override public File pathToFile(Path path) {
+        return ((HadoopRawLocalFileSystem)getRaw()).convert(path);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java
new file mode 100644
index 0000000..2484492
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopLocalFileSystemV2.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFs;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.local.LocalConfigKeys;
+
+import static org.apache.hadoop.fs.FsConstants.LOCAL_FS_URI;
+
+/**
+ * Local file system replacement for Hadoop jobs.
+ */
+public class HadoopLocalFileSystemV2 extends ChecksumFs {
+    /**
+     * Creates new local file system.
+     *
+     * @param cfg Configuration.
+     * @throws IOException If failed.
+     * @throws URISyntaxException If failed.
+     */
+    public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
+        super(new DelegateFS(cfg));
+    }
+
+    /**
+     * Creates new local file system.
+     *
+     * @param uri URI.
+     * @param cfg Configuration.
+     * @throws IOException If failed.
+     * @throws URISyntaxException If failed.
+     */
+    public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
+        this(cfg);
+    }
+
+    /**
+     * Delegate file system.
+     */
+    private static class DelegateFS extends DelegateToFileSystem {
+        /**
+         * Creates new local file system.
+         *
+         * @param cfg Configuration.
+         * @throws IOException If failed.
+         * @throws URISyntaxException If failed.
+         */
+        public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
+            super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getUriDefaultPort() {
+            return -1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FsServerDefaults getServerDefaults() throws IOException {
+            return LocalConfigKeys.getServerDefaults();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isValidName(String src) {
+            return true;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java
new file mode 100644
index 0000000..0aac4a3
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopParameters.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+/**
+ * This class lists parameters that can be specified in Hadoop configuration.
+ * Hadoop configuration can be specified in {@code core-site.xml} file
+ * or passed to map-reduce task directly when using Hadoop driver for IGFS file system:
+ * <ul>
+ *     <li>
+ *         {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides
+ *         the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()}
+ *         IGFS data node configuration property.
+ *     </li>
+ *     <li>
+ *         {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If
+ *         {@code true}, then all file system operations will be logged to a file.
+ *     </li>
+ *     <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li>
+ *     <li>
+ *         {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before
+ *         it gets flushed to log file. Higher values will imply greater performance, but will increase delay
+ *         before record appears in the log file.
+ *     </li>
+ *     <li>
+ *         {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data
+ *         node to which client is connected. If {@code true}, file will not be distributed and will be written
+ *         to a single data node. Default value is {@code true}.
+ *     </li>
+ *     <li>
+ *         {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to
+ *         local data node if it has enough free space. After some time it can be redistributed across nodes though.
+ *     </li>
+ * </ul>
+ * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in
+ * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}.
+ * <p>
+ * Sample configuration that can be placed to {@code core-site.xml} file:
+ * <pre name="code" class="xml">
+ *     &lt;property&gt;
+ *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.enabled&lt;/name&gt;
+ *         &lt;value&gt;true&lt;/value&gt;
+ *     &lt;/property&gt;
+ *     &lt;property&gt;
+ *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.dir&lt;/name&gt;
+ *         &lt;value&gt;/home/apache/ignite/log/sampling&lt;/value&gt;
+ *     &lt;/property&gt;
+ *     &lt;property&gt;
+ *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.batch_size&lt;/name&gt;
+ *         &lt;value&gt;16&lt;/value&gt;
+ *     &lt;/property&gt;
+ * </pre>
+ * Parameters could also be specified per mapreduce job, e.g.
+ * <pre name="code" class="bash">
+ * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4
+ * </pre>
+ * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest
+ * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}.
+ */
+public class HadoopParameters {
+    /** Parameter name for control over file colocation write mode. */
+    public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes";
+
+    /** Parameter name for custom sequential reads before prefetch value. */
+    public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH =
+        "fs.igfs.%s.open.sequential_reads_before_prefetch";
+
+    /** Parameter name for client logger directory. */
+    public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir";
+
+    /** Parameter name for log batch size. */
+    public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size";
+
+    /** Parameter name for log enabled flag. */
+    public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled";
+
+    /** Parameter name for prefer local writes flag. */
+    public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes";
+}
\ No newline at end of file


Mime
View raw message