ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [26/27] incubator-ignite git commit: [IGNITE-218]: intermediate commit.
Date Fri, 17 Apr 2015 11:41:32 GMT
[IGNITE-218]: intermediate commit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dedf5385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dedf5385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dedf5385

Branch: refs/heads/ignite-218
Commit: dedf5385b6e8ceb5e4924f0dfb60e7e80454709f
Parents: 7a659ce
Author: iveselovskiy <iveselovskiy@gridgain.com>
Authored: Fri Apr 17 14:40:21 2015 +0300
Committer: iveselovskiy <iveselovskiy@gridgain.com>
Committed: Fri Apr 17 14:40:21 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/igfs/IgfsUserContext.java |  34 +---
 .../internal/igfs/common/IgfsMarshaller.java    |  32 ----
 .../igfs/common/IgfsPathControlRequest.java     |  14 +-
 .../internal/processors/igfs/IgfsUtils.java     |  16 ++
 .../fs/IgniteHadoopFileSystemCounterWriter.java |   4 +-
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java |  34 ++--
 .../ignite/hadoop/fs/LazyConcurrentMap.java     | 177 ------------------
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |   4 +-
 .../internal/processors/hadoop/HadoopUtils.java |   3 +-
 .../hadoop/SecondaryFileSystemProvider.java     |  10 +-
 .../hadoop/fs/HadoopFileSystemsUtils.java       |  19 --
 .../hadoop/fs/HadoopLazyConcurrentMap.java      | 179 +++++++++++++++++++
 .../hadoop/igfs/HadoopIgfsInProc.java           |   2 +-
 .../hadoop/igfs/HadoopIgfsOutProc.java          |   2 +-
 .../hadoop/igfs/HadoopIgfsWrapper.java          |   7 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java |  23 ++-
 .../processors/hadoop/v2/HadoopV2Job.java       |   3 +-
 .../hadoop/v2/HadoopV2JobResourceManager.java   |   9 +-
 .../processors/hadoop/HadoopStartup.java        |   3 +-
 parent/pom.xml                                  |   2 +-
 20 files changed, 270 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
index 926e84d..e48507f 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.igfs;
 
 import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
@@ -47,18 +46,12 @@ public abstract class IgfsUserContext {
      * must return exactly the Exception thrown from the callable.
      */
     public static <T> T doAs(String user, final Callable<T> cllbl) {
-        // TODO: Use A.ensure();
-        if (F.isEmpty(user))
-            throw new IllegalArgumentException("Failed to use null or empty user name.");
-
-        // TODO: Remove.
-        user = user.intern();
+        A.ensure(!F.isEmpty(user), "Failed to use null or empty user name.");
 
         final String ctxUser = userStackThreadLocal.get();
 
         try {
-            // TODO: Equals: F.eq
-            if (ctxUser == user)
+            if (F.eq(ctxUser, user))
                 return cllbl.call(); // correct context is already there
 
             userStackThreadLocal.set(user);
@@ -77,27 +70,12 @@ public abstract class IgfsUserContext {
 
     /**
      * Gets the current context user.
-     * If this method is invoked outside of any doAs() on call stack, it will return null.
-     * Note that the returned user name is always interned, so
-     * you may compare the names using '==' reference equality.
-     * @return the current user, never null.
+     * If this method is invoked outside of any {@link #doAs(String, Callable)} on the call
stack, it will return null.
+     * Otherwise it will return the user name set in the most lower {@link #doAs(String,
Callable)} call
+     * on the call stack.
+     * @return the current user, may be null.
      */
     @Nullable public static String currentUser() {
         return userStackThreadLocal.get();
     }
-
-    /**
-     * Provides non-null interned user name.
-     * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME},
-     * which is the current process owner user.
-     * @param user a user name to be fixed.
-     * @return non-null interned user name.
-     */
-    // TODO: Move to IgfsUtils.
-    public static String fixUserName(@Nullable String user) {
-        if (F.isEmpty(user))
-           user = FileSystemConfiguration.DFLT_USER_NAME;
-
-        return user.intern();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
index a4c7830..6a6f22a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
@@ -301,8 +301,6 @@ public class IgfsMarshaller {
                 }
             }
 
-            assert msg != null;
-
             msg.command(cmd);
 
             return msg;
@@ -344,34 +342,4 @@ public class IgfsMarshaller {
 
         return null;
     }
-
-    /**
-     * Writes string to output.
-     *
-     * @param out Data output.
-     * @param str String.
-     * @throws IOException If write failed.
-     */
-    private void writeString(DataOutput out, @Nullable String str) throws IOException {
-        out.writeBoolean(str != null);
-
-        if (str != null)
-            out.writeUTF(str);
-    }
-
-    /**
-     * Reads string from input.
-     *
-     * @param in Data input.
-     * @return Read string.
-     * @throws IOException If read failed.
-     */
-    @Nullable private String readString(DataInput in) throws IOException {
-        boolean hasStr = in.readBoolean();
-
-        if (hasStr)
-            return in.readUTF();
-
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
index cfc8f16..2f6e6e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.igfs.common;
 
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
@@ -63,7 +64,7 @@ public class IgfsPathControlRequest extends IgfsMessage {
     /** Last modification time. */
     private long modificationTime;
 
-    // TODO: COmments.
+    /** The user name this control request is made on behalf of. */
     private String userName;
 
     /**
@@ -239,14 +240,21 @@ public class IgfsPathControlRequest extends IgfsMessage {
         return S.toString(IgfsPathControlRequest.class, this, "cmd", command());
     }
 
-    // TODO: COmments.
+    /**
+     * Getter for the user name.
+     * @return user name.
+     */
     public final String userName() {
         assert userName != null;
 
         return userName;
     }
 
+    /**
+     * Setter for the user name.
+     * @param userName the user name.
+     */
     public final void userName(String userName) {
-        this.userName = IgfsUserContext.fixUserName(userName);
+        this.userName = IgfsUtils.fixUserName(userName);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 2a915ec..558ef8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
 
 import java.lang.reflect.*;
 
@@ -83,4 +85,18 @@ public class IgfsUtils {
     private IgfsUtils() {
         // No-op.
     }
+
+    /**
+     * Provides non-null interned user name.
+     * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME},
+     * which is the current process owner user.
+     * @param user a user name to be fixed.
+     * @return non-null interned user name.
+     */
+    public static String fixUserName(@Nullable String user) {
+        if (F.isEmpty(user))
+           user = FileSystemConfiguration.DFLT_USER_NAME;
+
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 3b8c28e..821acdb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -79,8 +79,8 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
             try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobStatPath.toUri(),
hadoopCfg)) {
                 fs.mkdirs(jobStatPath);
 
-                // TODO: OUt-of-bound
-                try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME))))
{
+                try (PrintStream out = new PrintStream(fs.create(
+                        new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
                     for (T2<String, Long> evt : perfCntr.evts()) {
                         out.print(evt.get1());
                         out.print(':');

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index a6db645..024cc68 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -24,12 +24,12 @@ import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.igfs.secondary.*;
 import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.igfs.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
-import org.apache.ignite.hadoop.fs.LazyConcurrentMap.*;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.*;
 
 import java.io.*;
 import java.net.*;
@@ -57,8 +57,12 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** The default user name. It is used if no user context is set. */
     private final String dfltUserName;
 
+    /** FileSystem instance created for the default user.
+     * Stored outside the fileSysLazyMap due to performance reasons. */
+    private final FileSystem dfltFs;
+
     /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method.
*/
-    private final LazyConcurrentMap<String, FileSystem> fileSysLazyMap = new LazyConcurrentMap<>(
+    private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new
HadoopLazyConcurrentMap<>(
         new ValueFactory<String, FileSystem>() {
             @Override public FileSystem createValue(String key) {
                 try {
@@ -116,20 +120,20 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
         if (F.isEmpty(userName))
             userName = null;
 
-        this.dfltUserName = IgfsUserContext.fixUserName(userName);
+        this.dfltUserName = IgfsUtils.fixUserName(userName);
 
         try {
             this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath);
+
+            // File system creation for the default user name.
+            // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field:
+            this.dfltFs = secProvider.createFileSystem(dfltUserName);
         }
         catch (IOException e) {
             throw new IgniteCheckedException(e);
         }
 
-        // Test filesystem creation for the default user name.
-        // The value is stored in the 'fileSysLazyMap' cache.
-        FileSystem fileSys = fileSysLazyMap.getOrCreate(dfltUserName);
-
-        assert fileSys != null;
+        assert dfltFs != null;
 
         uri = secProvider.uri().toString();
 
@@ -351,9 +355,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
             new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
 
         try {
-            // TODO: Out of bounds.
-            return fileSysForUser().create(convert(path), props0.permission(), overwrite,
bufSize, (short)replication, blockSize,
-                null);
+            return fileSysForUser().create(convert(path), props0.permission(), overwrite,
bufSize,
+                (short)replication, blockSize, null);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props="
+ props +
@@ -465,13 +468,11 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteCheckedException {
-        final LazyConcurrentMap<String,FileSystem> map = fileSysLazyMap;
+        final HadoopLazyConcurrentMap<String,FileSystem> map = fileSysLazyMap;
 
         if (map == null)
             return; // already cleared.
 
-        fileSysLazyMap = null; // 'this' will be unusable after #close().
-
         List<IOException> ioExs = new LinkedList<>();
 
         Set<String> keySet = map.keySet();
@@ -516,6 +517,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
 
         assert !F.isEmpty(user);
 
+        if (F.eq(user, dfltUserName))
+            return dfltFs; // optimization
+
         return fileSysLazyMap.getOrCreate(user);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java
deleted file mode 100644
index 4c592af..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.fs;
-
-import org.apache.ignite.*;
-import org.jetbrains.annotations.*;
-import org.jsr166.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Maps values by keys.
- * Values are created lazily using {@link ValueFactory}.
- */
-// TODO: Remove from public.
-// TODO: Consistent naming (Hadoop prefix if in Hadoop module).
-public class LazyConcurrentMap<K, V> {
-    /** The map storing the actual values. */
-    private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
-
-    /** The factory passed in by the client. Will be used for lazy value creation. */
-    private final ValueFactory<K, V> factory;
-
-    /**
-     * Constructor.
-     * @param factory the factory to create new values lazily.
-     */
-    public LazyConcurrentMap(ValueFactory<K, V> factory) {
-        this.factory = factory;
-    }
-
-    /**
-     * Gets cached or creates a new value of V.
-     * Never returns null.
-     * @param k the key to associate the value with.
-     * @return the cached or newly created value, never null.
-     * @throws IgniteException on error
-     */
-    public V getOrCreate(K k) {
-        final ValueWrapper wNew = new ValueWrapper(k);
-
-        ValueWrapper w = map.putIfAbsent(k, wNew);
-
-        if (w == null) {
-            // new wrapper 'w' has been put, so init the value:
-            wNew.init();
-
-            w = wNew;
-        }
-
-        try {
-            V v = w.getValue();
-
-            assert v != null;
-
-            return v;
-        }
-        catch (InterruptedException ie) {
-            throw new IgniteException(ie);
-        }
-    }
-
-    /**
-     * Gets the value without any attempt to create a new one.
-     * @param k the key
-     * @return the value, or null if there is no value for this key.
-     */
-    public @Nullable V get(K k) {
-        ValueWrapper w = map.get(k);
-
-        if (w == null)
-            return null;
-
-        try {
-            return w.getValue();
-        }
-        catch (InterruptedException ie) {
-            throw new IgniteException(ie);
-        }
-    }
-
-    /**
-     * Gets the keySet of this map,
-     * the contract is as per {@link ConcurrentMap#keySet()}
-     * @return the set of keys, never null.
-     */
-    public Set<K> keySet() {
-        return map.keySet();
-    }
-
-    /**
-     * Clears the map.
-     * Follows the contract of {@link ConcurrentMap#clear()}
-     */
-    public void clear() {
-        map.clear();
-    }
-
-
-    /**
-     * Helper class that drives the lazy value creation.
-     */
-    private class ValueWrapper {
-        /** Value creation latch */
-        private final CountDownLatch vlueCrtLatch = new CountDownLatch(1);
-
-        /** the key */
-        private final K key;
-
-        /** the value */
-        private V v;
-
-        /**
-         * Creates new wrapper.
-         */
-        private ValueWrapper(K key) {
-            this.key = key;
-        }
-
-        /**
-         * Initializes the value using the factory.
-         */
-        private void init() {
-            final V v0 = factory.createValue(key);
-
-            if (v0 == null)
-                throw new IgniteException("Failed to create non-null value. [key=" + key
+ ']');
-
-            v = v0;
-
-            vlueCrtLatch.countDown();
-        }
-
-        /**
-         * Blocks until the value is initialized.
-         * @return the value
-         * @throws InterruptedException
-         */
-        @Nullable V getValue() throws InterruptedException {
-            // TODO: Use U.await(vlueCrtLatch) instead.
-            vlueCrtLatch.await();
-
-            return v;
-        }
-    }
-
-    /**
-     * Interface representing the factory that creates map values.
-     * @param <K> the type of the key.
-     * @param <V> the type of the value.
-     */
-    public interface ValueFactory <K, V> {
-        /**
-         * Creates the new value. Must never return null.
-         * @param key the key to create value for
-         * @return the value.
-         * @throws IgniteException on failure.
-         */
-        public V createValue(K key);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 6fd6d1b..46c9ba4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -180,7 +180,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
         String user = null;
 
         // -------------------------------------------
-        // TODO: Temporary workaround.
+        // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
         // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
         // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in
correct
         // ugi.doAs() closure.
@@ -194,7 +194,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
                 user = currUgi.getShortUserName();
         }
 
-        user = IgfsUserContext.fixUserName(user);
+        user = IgfsUtils.fixUserName(user);
 
         assert user != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 04c5ec2..d493bd4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -126,7 +126,8 @@ public class HadoopUtils {
                 break;
 
             case PHASE_REDUCE:
-                // TODO: Create ticket: why PHASE_REDUCE could have 0 reducers.
+                // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers?
+                // See https://issues.apache.org/jira/browse/IGNITE-764
                 setupProgress = 1;
                 mapProgress = 1;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index e49da8e..26fead9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.hadoop;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.security.*;
-import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
@@ -54,9 +54,7 @@ public class SecondaryFileSystemProvider {
      * @throws IOException
      */
     public SecondaryFileSystemProvider(final @Nullable String secUri,
-        final @Nullable String secConfPath/*, @Nullable String userName*/) throws IOException
{
-        //this.userName = userName;
-
+        final @Nullable String secConfPath) throws IOException {
         if (secConfPath != null) {
             URL url = U.resolveIgniteUrl(secConfPath);
 
@@ -92,7 +90,7 @@ public class SecondaryFileSystemProvider {
      * @throws IOException
      */
     public FileSystem createFileSystem(String userName) throws IOException {
-        userName = IgfsUserContext.fixUserName(userName);
+        userName = IgfsUtils.fixUserName(userName);
 
         final FileSystem fileSys;
 
@@ -117,7 +115,7 @@ public class SecondaryFileSystemProvider {
      * @throws IOException in case of error.
      */
     public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException
{
-        userName = IgfsUserContext.fixUserName(userName);
+        userName = IgfsUtils.fixUserName(userName);
 
 //        if (userName == null)
 //            return AbstractFileSystem.get(uri, cfg);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
index 7631ae9..d90bc28 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.hadoop.fs.v1.*;
 
 /**
  * Utilities for configuring file systems to support the separate working directory per each
thread.
@@ -29,20 +27,6 @@ public class HadoopFileSystemsUtils {
     /** Name of the property for setting working directory on create new local FS instance.
*/
     public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme()
+ ".workDir";
 
-//    /**
-//     * Set user name and default working directory for current thread if it's supported
by file system.
-//     *
-//     * @param fs File system.
-//     * @param userName User name.
-//     */
-//    @Deprecated // TODO: remove this method.
-//    public static void setUser(FileSystem fs, String userName) {
-//        if (fs instanceof IgniteHadoopFileSystem)
-//            ((IgniteHadoopFileSystem)fs).setUser(userName);
-////        else if (fs instanceof HadoopDistributedFileSystem)
-////            ((HadoopDistributedFileSystem)fs).setUser(userName);
-//    }
-
     /**
      * Setup wrappers of filesystems to support the separate working directory.
      *
@@ -52,8 +36,5 @@ public class HadoopFileSystemsUtils {
         cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
         cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
                 HadoopLocalFileSystemV2.class.getName());
-
-//        // TODO: this should be removed:
-//        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
new file mode 100644
index 0000000..cdafdde
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Maps values by keys.
+ * Values are created lazily using {@link ValueFactory}.
+ * Currently only {@link #clear()} method can remove a value.
+ *
+ * Despite of the name, does not depend on any Hadoop classes.
+ */
+public class HadoopLazyConcurrentMap<K, V> {
+    /** The map storing the actual values. */
+    private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
+
+    /** The factory passed in by the client. Will be used for lazy value creation. */
+    private final ValueFactory<K, V> factory;
+
+    /**
+     * Constructor.
+     * @param factory the factory to create new values lazily.
+     */
+    public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
+        this.factory = factory;
+    }
+
+    /**
+     * Gets cached or creates a new value of V.
+     * Never returns null.
+     * @param k the key to associate the value with.
+     * @return the cached or newly created value, never null.
+     * @throws IgniteException on error
+     */
+    public V getOrCreate(K k) {
+        final ValueWrapper wNew = new ValueWrapper(k);
+
+        ValueWrapper w = map.putIfAbsent(k, wNew);
+
+        if (w == null) {
+            // new wrapper 'w' has been put, so init the value:
+            wNew.init();
+
+            w = wNew;
+        }
+
+        try {
+            V v = w.getValue();
+
+            assert v != null;
+
+            return v;
+        }
+        catch (IgniteInterruptedCheckedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    /**
+     * Gets the value without any attempt to create a new one.
+     * @param k the key
+     * @return the value, or null if there is no value for this key.
+     */
+    public @Nullable V get(K k) {
+        ValueWrapper w = map.get(k);
+
+        if (w == null)
+            return null;
+
+        try {
+            return w.getValue();
+        }
+        catch (IgniteInterruptedCheckedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    /**
+     * Gets the keySet of this map,
+     * the contract is as per {@link ConcurrentMap#keySet()}
+     * @return the set of keys, never null.
+     */
+    public Set<K> keySet() {
+        return map.keySet();
+    }
+
+    /**
+     * Clears the map.
+     * Follows the contract of {@link ConcurrentMap#clear()}
+     */
+    public void clear() {
+        map.clear();
+    }
+
+
+    /**
+     * Helper class that drives the lazy value creation.
+     */
+    private class ValueWrapper {
+        /** Value creation latch */
+        private final CountDownLatch vlueCrtLatch = new CountDownLatch(1);
+
+        /** the key */
+        private final K key;
+
+        /** the value */
+        private V v;
+
+        /**
+         * Creates new wrapper.
+         */
+        private ValueWrapper(K key) {
+            this.key = key;
+        }
+
+        /**
+         * Initializes the value using the factory.
+         */
+        private void init() {
+            final V v0 = factory.createValue(key);
+
+            if (v0 == null)
+                throw new IgniteException("Failed to create non-null value. [key=" + key
+ ']');
+
+            v = v0;
+
+            vlueCrtLatch.countDown();
+        }
+
+        /**
+         * Blocks until the value is initialized.
+         * @return the value
+         * @throws IgniteInterruptedCheckedException if interrupted during wait.
+         */
+        @Nullable V getValue() throws IgniteInterruptedCheckedException {
+            U.await(vlueCrtLatch);
+
+            return v;
+        }
+    }
+
+    /**
+     * Interface representing the factory that creates map values.
+     * @param <K> the type of the key.
+     * @param <V> the type of the value.
+     */
+    public interface ValueFactory <K, V> {
+        /**
+         * Creates the new value. Must never return null.
+         * @param key the key to create value for
+         * @return the value.
+         * @throws IgniteException on failure.
+         */
+        public V createValue(K key);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index 771388a..ed7f296 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -56,7 +56,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
      * @param log Log.
      */
     public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException
{
-        this.user = IgfsUserContext.fixUserName(userName);
+        this.user = IgfsUtils.fixUserName(userName);
 
         this.igfs = igfs;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
index 639f2eb..f23c62c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
@@ -141,7 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         this.grid = grid;
         this.igfs = igfs;
         this.log = log;
-        this.userName = IgfsUserContext.fixUserName(user);
+        this.userName = IgfsUtils.fixUserName(user);
 
         io = HadoopIgfsIpcIo.get(log, endpoint);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
index b650318..7d0db49 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -66,8 +66,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
      * @param conf Configuration.
      * @param log Current logger.
      */
-    // TODO: Out of bounds.
-    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log,
String user) throws IOException {
+    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log,
String user)
+            throws IOException {
         try {
             this.authority = authority;
             this.endpoint = new HadoopIgfsEndpoint(authority);
@@ -373,7 +373,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                 }
                 catch (IOException | IgniteCheckedException e) {
                     if (e instanceof HadoopIgfsCommunicationException)
-                        hadoop.close(true);
+                        if (hadoop != null)
+                            hadoop.close(true);
 
                     if (log.isDebugEnabled())
                         log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.",
e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index b47bedd..131c870 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -126,15 +126,7 @@ public abstract class HadoopRunnableTask implements Callable<Void>
{
         else {
             // do the call in the context of 'user':
             try {
-                final String ticketCachePath;
-
-                if (job instanceof HadoopV2Job) {
-                    Configuration conf = ((HadoopV2Job)job).jobConf();
-
-                    ticketCachePath = conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
-                }
-                else
-                    ticketCachePath = job.info().property(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+                final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
 
                 UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath,
user);
 
@@ -150,6 +142,19 @@ public abstract class HadoopRunnableTask implements Callable<Void>
{
     }
 
     /**
+     * Gets the job property.
+     */
+    private String getJobProperty(String key) {
+        if (job instanceof HadoopV2Job) {
+            Configuration conf = ((HadoopV2Job)job).jobConf();
+
+            return conf.get(key);
+        }
+        else
+            return job.info().property(key);
+    }
+
+    /**
      * Runnable task call implementation
      * @return null.
      * @throws IgniteCheckedException

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 0d40f34..8ffab14 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.hadoop.v2;
 
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.JobID;
@@ -323,7 +322,7 @@ public class HadoopV2Job implements HadoopJob {
 
     /**
      * Getter for job configuration.
-     * @return
+     * @return the job configuration
      */
     public JobConf jobConf() {
         return jobConf;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index f75425e..7dcd10b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -166,12 +166,15 @@ public class HadoopV2JobResourceManager {
                     // TODO: Out of bounds.
                     try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg)) {
                         if (!fs.exists(stagingDir))
-                            throw new IgniteCheckedException("Failed to find map-reduce submission
directory (does not exist): " +
+                            throw new IgniteCheckedException("Failed to find map-reduce "
+
+                                "submission directory (does not exist): " +
                                 stagingDir);
 
                         if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
-                            throw new IgniteCheckedException("Failed to copy job submission
directory contents to local file system " +
-                                "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+ ", jobId=" + jobId + ']');
+                            throw new IgniteCheckedException("Failed to copy job submission
" +
+                                "directory contents to local file system " +
+                                "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+                                + ", jobId=" + jobId + ']');
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
index 1d398b5..1a93223 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
@@ -39,8 +39,7 @@ public class HadoopStartup {
     public static Configuration configuration() {
         Configuration cfg = new Configuration();
 
-        // TODO: Remove.
-        cfg.set("fs.defaultFS", "igfs://igfs@localhost:10500");
+        cfg.set("fs.defaultFS", "igfs://igfs@localhost");
 
         cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName());
         cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index cb84f7f..661b310 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -35,7 +35,7 @@
 
     <properties>
         <ignite.edition>fabric</ignite.edition>
-        <hadoop.version>2.6.0</hadoop.version>
+        <hadoop.version>2.4.1</hadoop.version>
         <spring.version>4.1.0.RELEASE</spring.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <maven.build.timestamp.format>MMMM d yyyy</maven.build.timestamp.format>


Mime
View raw message