Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3CF5317B9C for ; Tue, 14 Apr 2015 14:05:03 +0000 (UTC) Received: (qmail 76014 invoked by uid 500); 14 Apr 2015 14:05:03 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 75987 invoked by uid 500); 14 Apr 2015 14:05:03 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 75976 invoked by uid 99); 14 Apr 2015 14:05:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Apr 2015 14:05:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,FUZZY_CPILL,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 14 Apr 2015 14:04:58 +0000 Received: (qmail 74858 invoked by uid 99); 14 Apr 2015 14:04:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Apr 2015 14:04:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2EE5DE03C0; Tue, 14 Apr 2015 14:04:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 14 Apr 2015 14:04:39 -0000 Message-Id: In-Reply-To: <70966650632e4adb884a8c49a14b12f9@git.apache.org> References: <70966650632e4adb884a8c49a14b12f9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] incubator-ignite git commit: [IGNITE-218] workable version X-Virus-Checked: Checked by ClamAV on apache.org [IGNITE-218] workable version Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1634a685 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1634a685 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1634a685 Branch: refs/heads/ignite-218 Commit: 1634a685ad0817659fe80e0d5809f75faa6616ad Parents: f11e357 Author: iveselovskiy Authored: Tue Apr 14 16:57:19 2015 +0300 Committer: iveselovskiy Committed: Tue Apr 14 16:57:19 2015 +0300 ---------------------------------------------------------------------- .../igfs/secondary/IgfsSecondaryFileSystem.java | 14 ++ .../igfs/common/IgfsHandshakeRequest.java | 13 ++ .../internal/igfs/common/IgfsMarshaller.java | 5 + .../igfs/common/IgfsPathControlRequest.java | 12 ++ .../internal/processors/hadoop/HadoopJob.java | 2 +- .../internal/processors/igfs/IgfsAsyncImpl.java | 10 + .../ignite/internal/processors/igfs/IgfsEx.java | 14 ++ .../internal/processors/igfs/IgfsImpl.java | 158 ++++++++++++--- .../processors/igfs/IgfsIpcHandler.java | 48 +++-- .../igfs/IgfsSecondaryFileSystemImpl.java | 16 +- .../internal/processors/igfs/IgfsServer.java | 4 +- .../ignite/internal/util/IgniteUtils.java | 12 ++ .../fs/IgniteHadoopFileSystemCounterWriter.java | 6 +- .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 42 +++- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 124 +++++++++--- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 26 ++- .../internal/processors/hadoop/HadoopUtils.java | 8 +- .../hadoop/SecondaryFileSystemProvider.java | 24 ++- .../hadoop/fs/HadoopDistributedFileSystem.java | 199 ++++++++++--------- .../hadoop/fs/HadoopFileSystemsUtils.java | 28 +-- .../processors/hadoop/igfs/HadoopIgfsEx.java | 6 + .../hadoop/igfs/HadoopIgfsInProc.java | 17 +- .../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 2 +- .../hadoop/igfs/HadoopIgfsOutProc.java | 34 +++- .../hadoop/igfs/HadoopIgfsWrapper.java | 14 +- .../hadoop/taskexecutor/HadoopRunnableTask.java | 46 +++++ .../processors/hadoop/v2/HadoopV2Job.java | 38 +++- .../hadoop/v2/HadoopV2JobResourceManager.java | 109 +++++++--- .../hadoop/v2/HadoopV2TaskContext.java | 9 +- .../IgniteHadoopFileSystemClientSelfTest.java | 2 +- .../HadoopDefaultMapReducePlannerSelfTest.java | 12 ++ .../hadoop/HadoopFileSystemsTest.java | 2 +- .../processors/hadoop/HadoopStartup.java | 2 +- .../hadoop/examples/HadoopWordCount2.java | 24 ++- parent/pom.xml | 2 +- 35 files changed, 819 insertions(+), 265 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java index 9026eac..c2c610c 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java @@ -198,4 +198,18 @@ public interface IgfsSecondaryFileSystem { * @return Map of properties. */ public Map properties(); + + /** + * Gets an instance of 2ndary Fs for the specified user (may be 'this'). + * @param userName the user name + * @return the instance + * @throws IgniteCheckedException + */ + public IgfsSecondaryFileSystem forUser(String userName) throws IgniteCheckedException; + + /** + * The user name this 2ndary Fs works on behalf of. + * @return the user name. + */ + public String getUser(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java index ec8ef6e..4d2091b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsHandshakeRequest.java @@ -31,6 +31,9 @@ public class IgfsHandshakeRequest extends IgfsMessage { /** Expected IGFS name. */ private String igfsName; + /** User name the request is done on behalf of. */ + private String userName; + /** Logger directory. */ private String logDir; @@ -90,4 +93,14 @@ public class IgfsHandshakeRequest extends IgfsMessage { @Override public String toString() { return S.toString(IgfsHandshakeRequest.class, this); } + + public final String userName() { + assert userName != null; + + return userName; + } + + public final void userName(String userName) { + this.userName = U.fixUserName(userName); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java index 11af716..e1a5d00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java @@ -73,6 +73,7 @@ public class IgfsMarshaller { } /** + * Serializes the message and sends it into the given output stream. * @param msg Message. * @param hdr Message header. * @param out Output. @@ -89,6 +90,7 @@ public class IgfsMarshaller { IgfsHandshakeRequest req = (IgfsHandshakeRequest)msg; + U.writeString(out, req.userName()); U.writeString(out, req.gridName()); U.writeString(out, req.igfsName()); U.writeString(out, req.logDirectory()); @@ -119,6 +121,7 @@ public class IgfsMarshaller { IgfsPathControlRequest req = (IgfsPathControlRequest)msg; + U.writeString(out, req.userName()); writePath(out, req.path()); writePath(out, req.destinationPath()); out.writeBoolean(req.flag()); @@ -205,6 +208,7 @@ public class IgfsMarshaller { case HANDSHAKE: { IgfsHandshakeRequest req = new IgfsHandshakeRequest(); + req.userName(U.readString(in)); req.gridName(U.readString(in)); req.igfsName(U.readString(in)); req.logDirectory(U.readString(in)); @@ -236,6 +240,7 @@ public class IgfsMarshaller { case OPEN_CREATE: { IgfsPathControlRequest req = new IgfsPathControlRequest(); + req.userName(U.readString(in)); req.path(readPath(in)); req.destinationPath(readPath(in)); req.flag(in.readBoolean()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java index 7ed1619..99a946a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java @@ -63,6 +63,8 @@ public class IgfsPathControlRequest extends IgfsMessage { /** Last modification time. */ private long modificationTime; + private String userName; + /** * @param path Path. */ @@ -235,4 +237,14 @@ public class IgfsPathControlRequest extends IgfsMessage { @Override public String toString() { return S.toString(IgfsPathControlRequest.class, this, "cmd", command()); } + + public final String userName() { + assert userName != null; + + return userName; + } + + public final void userName(String userName) { + this.userName = U.fixUserName(userName); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java index 65cb48d..5fd6c81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java @@ -98,5 +98,5 @@ public interface HadoopJob { /** * Cleans up the job staging directory. */ - void cleanupStagingDirectory(); + public void cleanupStagingDirectory(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java index 48a32f4..1ae6446 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java @@ -314,4 +314,14 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter impleme @Override public IgfsSecondaryFileSystem asSecondary() { return igfs.asSecondary(); } + + /** {@inheritDoc} */ + @Override public IgfsEx forUser(String userName) throws IgniteCheckedException { + return igfs.forUser(userName); + } + + /** {@inheritDoc} */ + @Override public String user() { + return igfs.user(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java index 99f647e..5f73407 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java @@ -169,4 +169,18 @@ public interface IgfsEx extends IgniteFileSystem { * @return Secondary file system wrapper. */ public IgfsSecondaryFileSystem asSecondary(); + + /** + * TODO + * @param userName + * @return + * @throws IgniteCheckedException + */ + public IgfsEx forUser(@Nullable String userName) throws IgniteCheckedException; + + /** + * Getter for user name. + * @return user name. + */ + public String user(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 824f178..82ef628 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -42,6 +42,7 @@ import org.jetbrains.annotations.*; import org.jsr166.*; import java.io.*; +import java.lang.ref.*; import java.net.*; import java.util.*; import java.util.concurrent.*; @@ -56,7 +57,7 @@ import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.*; /** * Cache-based IGFS implementation. */ -public final class IgfsImpl implements IgfsEx { +public class IgfsImpl implements IgfsEx { /** Default permissions for file system entry. */ private static final String PERMISSION_DFLT_VAL = "0777"; @@ -64,7 +65,7 @@ public final class IgfsImpl implements IgfsEx { private static final Map DFLT_DIR_META = F.asMap(PROP_PERMISSION, PERMISSION_DFLT_VAL); /** Handshake message. */ - private final IgfsPaths secondaryPaths; + private IgfsPaths secondaryPaths; /** Cache based structure (meta data) manager. */ private IgfsMetaManager meta; @@ -73,7 +74,10 @@ public final class IgfsImpl implements IgfsEx { private IgfsDataManager data; /** FS configuration. */ - private FileSystemConfiguration cfg; + private final FileSystemConfiguration cfg; + + /** The user name this Fs works on behalf of. */ + private final String user; /** IGFS context. */ private IgfsContext igfsCtx; @@ -88,7 +92,7 @@ public final class IgfsImpl implements IgfsEx { private IgniteLogger log; /** Mode resolver. */ - private final IgfsModeResolver modeRslvr; + private IgfsModeResolver modeRslvr; /** Connection to the secondary file system. */ private IgfsSecondaryFileSystem secondaryFs; @@ -96,7 +100,7 @@ public final class IgfsImpl implements IgfsEx { /** Busy lock. */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - /** Writers map. */ + /** Writers userToIgfsExMap. */ private final ConcurrentHashMap8 workerMap = new ConcurrentHashMap8<>(); /** Delete futures. */ @@ -120,6 +124,8 @@ public final class IgfsImpl implements IgfsEx { /** Eviction policy (if set). */ private IgfsPerBlockLruEvictionPolicy evictPlc; + private final ConcurrentMap> userToIgfsExMap = new ConcurrentHashMap<>(); + /** * Creates IGFS instance with given context. * @@ -137,10 +143,67 @@ public final class IgfsImpl implements IgfsEx { evts = igfsCtx.kernalContext().event(); meta = igfsCtx.meta(); data = igfsCtx.data(); - secondaryFs = cfg.getSecondaryFileSystem(); + + user = U.fixUserName(getSecondaryFsUser()); + + assert user != null; + + initIgfsSecondaryFileSystem(); + + // Check whether IGFS LRU eviction policy is set on data cache. + String dataCacheName = igfsCtx.configuration().getDataCacheName(); + + for (CacheConfiguration cacheCfg : igfsCtx.kernalContext().config().getCacheConfiguration()) { + if (F.eq(dataCacheName, cacheCfg.getName())) { + EvictionPolicy evictPlc = cacheCfg.getEvictionPolicy(); + + if (evictPlc != null & evictPlc instanceof IgfsPerBlockLruEvictionPolicy) + this.evictPlc = (IgfsPerBlockLruEvictionPolicy)evictPlc; + + break; + } + } + + topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name()); + + igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr); + igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); + } + + /** + * Gets secondary file system user, or null, if no secondary file system is present. + * @return the secondary file system user. + */ + @Nullable protected String getSecondaryFsUser() { + FileSystemConfiguration fsCfg = igfsCtx.configuration(); + + if (fsCfg == null) + return null; + + IgfsSecondaryFileSystem sec = fsCfg.getSecondaryFileSystem(); + + if (sec == null) + return null; + + return sec.getUser(); + } + + /** + * Initializes 2ndary file system. + * @throws IgniteCheckedException on error + */ + private void initIgfsSecondaryFileSystem() throws IgniteCheckedException { + // This is the "prototype" (in terms of GOF pattern) of the 2ndary Fs to be cloned + // for the specified user: + final IgfsSecondaryFileSystem secondaryFs0 = cfg.getSecondaryFileSystem(); + + if (secondaryFs0 == null) + secondaryFs = null; + else + secondaryFs = secondaryFs0.forUser(user); /* Default IGFS mode. */ - IgfsMode dfltMode; + final IgfsMode dfltMode; if (secondaryFs == null) { if (cfg.getDefaultMode() == PROXY) @@ -195,25 +258,6 @@ public final class IgfsImpl implements IgfsEx { secondaryPaths = new IgfsPaths(secondaryFs == null ? null : secondaryFs.properties(), dfltMode, modeRslvr.modesOrdered()); - - // Check whether IGFS LRU eviction policy is set on data cache. - String dataCacheName = igfsCtx.configuration().getDataCacheName(); - - for (CacheConfiguration cacheCfg : igfsCtx.kernalContext().config().getCacheConfiguration()) { - if (F.eq(dataCacheName, cacheCfg.getName())) { - EvictionPolicy evictPlc = cacheCfg.getEvictionPolicy(); - - if (evictPlc != null & evictPlc instanceof IgfsPerBlockLruEvictionPolicy) - this.evictPlc = (IgfsPerBlockLruEvictionPolicy)evictPlc; - - break; - } - } - - topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name()); - - igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr); - igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); } /** @@ -2096,4 +2140,66 @@ public final class IgfsImpl implements IgfsEx { else throw new IllegalStateException("Failed to perform IGFS action because grid is stopping."); } + + /** {@inheritDoc} */ + @Override public IgfsEx forUser(String userName) throws IgniteCheckedException { + final String userFixed = U.fixUserName(userName); + + if (this.user == userFixed) + return this; // same user + + Reference ref = userToIgfsExMap.get(userFixed); + + IgfsImpl val = (ref == null) ? null : ref.get(); + + if (val == null) { + // Create impl from the same context but with different user: + IgfsImpl newVal = new IgfsImpl(igfsCtx) { + @Override protected String getSecondaryFsUser() { + return userFixed; + } + }; + final Reference newRef = new WeakReference<>(newVal); + + while (true) { + ref = userToIgfsExMap.get(userFixed); + + if (ref == null) { + ref = userToIgfsExMap.putIfAbsent(userFixed, newRef); + + if (ref == null) { + ref = newRef; + val = newVal; + + break; // ref replaced with newRef + } + } + + val = ref.get(); + + if (val != null) + break; // there is an existing value + + boolean replaced = userToIgfsExMap.replace(userFixed, ref, newRef); + + if (replaced) { + ref = newRef; + val = newVal; + + break; + } + } + } + + assert val == ref.get(); + assert val != null; + assert F.eq(val.user(), userFixed); + + return val; + } + + /** {@inheritDoc} */ + @Override public String user() { + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java index 8a8b858..4812c95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -51,10 +51,10 @@ class IgfsIpcHandler implements IgfsServerHandler { private final int bufSize; // Buffer size. Must not be less then file block size. /** IGFS instance for this handler. */ - private IgfsEx igfs; + private final IgfsEx igfs; /** Resource ID generator. */ - private AtomicLong rsrcIdGen = new AtomicLong(); + private final AtomicLong rsrcIdGen = new AtomicLong(); /** Stopping flag. */ private volatile boolean stopping; @@ -250,77 +250,83 @@ class IgfsIpcHandler implements IgfsServerHandler { IgfsControlResponse res = new IgfsControlResponse(); + final String userName = req.userName(); + + assert userName != null; + + final IgfsEx userIgfs = igfs.forUser(userName); + try { switch (cmd) { case EXISTS: - res.response(igfs.exists(req.path())); + res.response(userIgfs.exists(req.path())); break; case INFO: - res.response(igfs.info(req.path())); + res.response(userIgfs.info(req.path())); break; case PATH_SUMMARY: - res.response(igfs.summary(req.path())); + res.response(userIgfs.summary(req.path())); break; case UPDATE: - res.response(igfs.update(req.path(), req.properties())); + res.response(userIgfs.update(req.path(), req.properties())); break; case RENAME: - igfs.rename(req.path(), req.destinationPath()); + userIgfs.rename(req.path(), req.destinationPath()); res.response(true); break; case DELETE: - res.response(igfs.delete(req.path(), req.flag())); + res.response(userIgfs.delete(req.path(), req.flag())); break; case MAKE_DIRECTORIES: - igfs.mkdirs(req.path(), req.properties()); + userIgfs.mkdirs(req.path(), req.properties()); res.response(true); break; case LIST_PATHS: - res.paths(igfs.listPaths(req.path())); + res.paths(userIgfs.listPaths(req.path())); break; case LIST_FILES: - res.files(igfs.listFiles(req.path())); + res.files(userIgfs.listFiles(req.path())); break; case SET_TIMES: - igfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); + userIgfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); res.response(true); break; case AFFINITY: - res.locations(igfs.affinity(req.path(), req.start(), req.length())); + res.locations(userIgfs.affinity(req.path(), req.start(), req.length())); break; case OPEN_READ: { - IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) : - igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); + IgfsInputStreamAdapter igfsIn = !req.flag() ? userIgfs.open(req.path(), bufSize) : + userIgfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); long streamId = registerResource(ses, igfsIn); if (log.isDebugEnabled()) - log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" + + log.debug("Opened IGFS input stream for file read [igfsName=" + userIgfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null, @@ -332,7 +338,7 @@ class IgfsIpcHandler implements IgfsServerHandler { } case OPEN_CREATE: { - long streamId = registerResource(ses, igfs.create( + long streamId = registerResource(ses, userIgfs.create( req.path(), // Path. bufSize, // Buffer size. req.flag(), // Overwrite if exists. @@ -343,7 +349,7 @@ class IgfsIpcHandler implements IgfsServerHandler { )); if (log.isDebugEnabled()) - log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" + + log.debug("Opened IGFS output stream for file create [igfsName=" + userIgfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); res.response(streamId); @@ -352,7 +358,7 @@ class IgfsIpcHandler implements IgfsServerHandler { } case OPEN_APPEND: { - long streamId = registerResource(ses, igfs.append( + long streamId = registerResource(ses, userIgfs.append( req.path(), // Path. bufSize, // Buffer size. req.flag(), // Create if absent. @@ -360,7 +366,7 @@ class IgfsIpcHandler implements IgfsServerHandler { )); if (log.isDebugEnabled()) - log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" + + log.debug("Opened IGFS output stream for file append [igfsName=" + userIgfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); res.response(streamId); @@ -379,7 +385,7 @@ class IgfsIpcHandler implements IgfsServerHandler { } if (log.isDebugEnabled()) - log.debug("Finished processing path control request [igfsName=" + igfs.name() + ", req=" + req + + log.debug("Finished processing path control request [igfsName=" + userIgfs.name() + ", req=" + req + ", res=" + res + ']'); return res; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java index 683b317..23431a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java @@ -30,14 +30,14 @@ import java.util.*; */ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { /** Delegate. */ - private final IgfsImpl igfs; + private final IgfsEx igfs; /** * Constructor. * * @param igfs Delegate. */ - IgfsSecondaryFileSystemImpl(IgfsImpl igfs) { + IgfsSecondaryFileSystemImpl(IgfsEx igfs) { this.igfs = igfs; } @@ -118,4 +118,16 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { @Override public Map properties() { return Collections.emptyMap(); } + + /** {@inheritDoc} */ + @Override public IgfsSecondaryFileSystem forUser(String userName) throws IgniteCheckedException { + IgfsEx forUser = igfs.forUser(userName); + + return new IgfsSecondaryFileSystemImpl(forUser); + } + + /** {@inheritDoc} */ + @Override public String getUser() { + return igfs.user(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java index 253d5be..caa6866 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java @@ -239,13 +239,13 @@ public class IgfsServer { */ private class ClientWorker extends GridWorker { /** Connected client endpoint. */ - private IpcEndpoint endpoint; + private final IpcEndpoint endpoint; /** Data output stream. */ private final IgfsDataOutputStream out; /** Client session object. */ - private IgfsClientSession ses; + private final IgfsClientSession ses; /** Queue node for fast unlink. */ private ConcurrentLinkedDeque8.Node node; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 1aac985..55785e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9005,4 +9005,16 @@ public abstract class IgniteUtils { return hasShmem; } + + /** + * Provides non-null interned user name + * @param user a user name. + * @return non-null interned user name + */ + public static String fixUserName(@Nullable String user) { + if (user == null) + return ""; + + return user.intern(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 66e9761..80f693e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -20,10 +20,12 @@ package org.apache.ignite.hadoop.fs; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.typedef.*; import java.io.*; @@ -72,7 +74,9 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); try { - FileSystem fs = jobStatPath.getFileSystem(hadoopCfg); + hadoopCfg.set(MRJobConfig.USER_NAME, user); + + FileSystem fs = HadoopV2JobResourceManager.fileSystemForUser(jobStatPath.toUri(), hadoopCfg); fs.mkdirs(jobStatPath); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index ba891f8..3c9cc8a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -20,7 +20,6 @@ package org.apache.ignite.hadoop.fs; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.ipc.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; import org.apache.ignite.igfs.secondary.*; @@ -43,7 +42,11 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** Hadoop file system. */ private final FileSystem fileSys; - /** Properties of file system */ + /** Properties of file system, see {@link #properties()} + * See {@link IgfsEx#SECONDARY_FS_USER_NAME} + * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH} + * See {@link IgfsEx#SECONDARY_FS_URI} + * */ private final Map props = new HashMap<>(); /** @@ -131,14 +134,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @param detailMsg Detailed error message. * @return Appropriate exception. */ - @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"}) private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { - boolean wrongVer = X.hasCause(e, RemoteException.class) || - (e.getMessage() != null && e.getMessage().contains("Failed on local")); - - return !wrongVer ? cast(detailMsg, e) : - new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " + - "version.", e); } + return cast(detailMsg, e); + } /** * Cast IO exception to IGFS exception. @@ -450,4 +448,30 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys public FileSystem fileSystem() { return fileSys; } + + /** + * TODO + * @param user2 + * @return + * @throws IgniteCheckedException + */ + @Override public IgfsSecondaryFileSystem forUser(String user2) throws IgniteCheckedException { + String user = props.get(SECONDARY_FS_USER_NAME); + if (F.eq(user, user2)) + return this; + else { + String uri = props.get(SECONDARY_FS_URI); + String cfgPath = props.get(SECONDARY_FS_CONFIG_PATH); + + return new IgniteHadoopIgfsSecondaryFileSystem(uri, cfgPath, user2); + } + } + + /** + * TODO + * @return + */ + @Override public String getUser() { + return props.get(SECONDARY_FS_USER_NAME); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 1f53a06..2dd7ddb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.hdfs.*; import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.security.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; @@ -97,21 +98,22 @@ public class IgniteHadoopFileSystem extends FileSystem { /** Grid remote client. */ private HadoopIgfsWrapper rmtClient; - /** User name for each thread. */ - private final ThreadLocal userName = new ThreadLocal(){ - /** {@inheritDoc} */ - @Override protected String initialValue() { - return DFLT_USER_NAME; - } - }; - - /** Working directory for each thread. */ - private final ThreadLocal workingDir = new ThreadLocal(){ - /** {@inheritDoc} */ - @Override protected Path initialValue() { - return getHomeDirectory(); - } - }; +// /** User name for each thread. */ +// private final ThreadLocal userName = new ThreadLocal(){ +// /** {@inheritDoc} */ +// @Override protected String initialValue() { +// return DFLT_USER_NAME; +// } +// }; + +// /** Working directory for each thread. */ +// private final ThreadLocal workingDir = new ThreadLocal(){ +// /** {@inheritDoc} */ +// @Override protected Path initialValue() { +// return getHomeDirectory(); +// } +// }; + private Path workingDir; /** Default replication factor. */ private short dfltReplication; @@ -129,6 +131,9 @@ public class IgniteHadoopFileSystem extends FileSystem { /** Secondary URI string. */ private URI secondaryUri; + /** The user name this file system was created on behalf of. */ + private String user; + /** IGFS mode resolver. */ private IgfsModeResolver modeRslvr; @@ -182,6 +187,37 @@ public class IgniteHadoopFileSystem extends FileSystem { } /** + * Gets non-null and interned user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + public static String getHadoopUser(@Nullable Configuration cfg) throws IOException { + String user = null; + + // First, try to get the user from MR Job configuration: + if (cfg != null) + user = cfg.get(MRJobConfig.USER_NAME); + + // 2nd, try to get it from UserGroupInformation (may return any result if we're + // inside UserGroupInformation.doAs(...) closure): + if (user == null) { + UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser(); + if (currentUgi != null) + user = currentUgi.getShortUserName(); + } + + // 3rd, get the default system (process owner) user name (defaults to "anonymous" in case of null): + if (user == null) + user = DFLT_USER_NAME; + + assert user != null; + + user = U.fixUserName(user); + + return user; + } + + /** * Public setter that can be used by direct users of FS or Visor. * * @param colocateFileWrites Whether all ongoing file writes should be colocated. @@ -221,7 +257,9 @@ public class IgniteHadoopFileSystem extends FileSystem { uriAuthority = uri.getAuthority(); - setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); + user = getHadoopUser(cfg); + + //setUser(user); // Override sequential reads before prefetch if needed. seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); @@ -244,7 +282,7 @@ public class IgniteHadoopFileSystem extends FileSystem { String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG); + rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user); // Handshake. IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); @@ -289,11 +327,10 @@ public class IgniteHadoopFileSystem extends FileSystem { String secUri = props.get(SECONDARY_FS_URI); String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - String secUserName = props.get(SECONDARY_FS_USER_NAME); try { SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath, - secUserName); + user); secondaryFs = secProvider.createFileSystem(); secondaryUri = secProvider.uri(); @@ -306,6 +343,9 @@ public class IgniteHadoopFileSystem extends FileSystem { "will have no effect): " + e.getMessage()); } } + + // set working directory to the hone directory of the current Fs user: + setWorkingDirectory(null); } finally { leaveBusy(); @@ -849,7 +889,7 @@ public class IgniteHadoopFileSystem extends FileSystem { /** {@inheritDoc} */ @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + userName.get()); + Path path = new Path("/user/" + user/*userName.get()*/); return path.makeQualified(getUri(), null); } @@ -859,10 +899,13 @@ public class IgniteHadoopFileSystem extends FileSystem { * * @param userName User name. */ + @Deprecated // TODO: remove this method. public void setUser(String userName) { - this.userName.set(userName); + //System.out.println(this + ": ##### setting user = " + userName + ", thread = " + Thread.currentThread()); + assert F.eq(user, userName); + //this.userName.set(userName); - setWorkingDirectory(null); + //setWorkingDirectory(null); } /** {@inheritDoc} */ @@ -873,7 +916,7 @@ public class IgniteHadoopFileSystem extends FileSystem { if (secondaryFs != null) secondaryFs.setWorkingDirectory(toSecondary(homeDir)); - workingDir.set(homeDir); + workingDir = homeDir; //.set(homeDir); } else { Path fixedNewPath = fixRelativePart(newPath); @@ -886,13 +929,13 @@ public class IgniteHadoopFileSystem extends FileSystem { if (secondaryFs != null) secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath)); - workingDir.set(fixedNewPath); + workingDir = fixedNewPath; //.set(fixedNewPath); } } /** {@inheritDoc} */ @Override public Path getWorkingDirectory() { - return workingDir.get(); + return workingDir; //.get(); } /** {@inheritDoc} */ @@ -1153,7 +1196,7 @@ public class IgniteHadoopFileSystem extends FileSystem { return null; return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : - new IgfsPath(convert(workingDir.get()), path.toUri().getPath()); + new IgfsPath(convert(workingDir), path.toUri().getPath()); } /** @@ -1191,9 +1234,16 @@ public class IgniteHadoopFileSystem extends FileSystem { */ @SuppressWarnings("deprecation") private FileStatus convert(IgfsFile file) { - return new FileStatus(file.length(), file.isDirectory(), getDefaultReplication(), - file.groupBlockSize(), file.modificationTime(), file.accessTime(), permission(file), - file.property(PROP_USER_NAME, DFLT_USER_NAME), file.property(PROP_GROUP_NAME, "users"), + return new FileStatus( + file.length(), + file.isDirectory(), + getDefaultReplication(), + file.groupBlockSize(), + file.modificationTime(), + file.accessTime(), + permission(file), + file.property(PROP_USER_NAME, user), + file.property(PROP_GROUP_NAME, "users"), convert(file.path())) { @Override public String toString() { return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() + @@ -1247,4 +1297,20 @@ public class IgniteHadoopFileSystem extends FileSystem { @Override public String toString() { return S.toString(IgniteHadoopFileSystem.class, this); } + + /** + * Returns the user name this File System is created on behalf of. + * @return the user name + */ + public String user() { + return user; + } + + /** + * Getter for secondaryFs field. + * @return the secondary file system, if any. + */ + public FileSystem secondaryFs() { + return secondaryFs; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 9cfb79b..71413f5 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -22,7 +22,6 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; @@ -40,6 +39,7 @@ import java.util.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.configuration.FileSystemConfiguration.*; +import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*; import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*; @@ -91,11 +91,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea /** Grid remote client. */ private HadoopIgfsWrapper rmtClient; + /** The name of the user this File System created on behalf of. */ + private final String user; + /** Working directory. */ private IgfsPath workingDir; /** URI. */ - private URI uri; + private final URI uri; /** Authority. */ private String uriAuthority; @@ -141,6 +144,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea uri = name; + user = getHadoopUser(cfg); + try { initialize(name, cfg); } @@ -152,7 +157,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea throw e; } - workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); + workingDir = new IgfsPath("/user/" + user); } /** {@inheritDoc} */ @@ -240,7 +245,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG); + rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user); // Handshake. IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); @@ -284,11 +289,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea String secUri = props.get(SECONDARY_FS_URI); String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - String secUserName = props.get(SECONDARY_FS_USER_NAME); try { SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath, - secUserName); + user); secondaryFs = secProvider.createAbstractFileSystem(); secondaryUri = secProvider.uri(); @@ -929,7 +933,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea file.modificationTime(), file.accessTime(), permission(file), - file.property(PROP_USER_NAME, DFLT_USER_NAME), + file.property(PROP_USER_NAME, user), file.property(PROP_GROUP_NAME, "users"), convert(file.path())) { @Override public String toString() { @@ -983,4 +987,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea @Override public String toString() { return S.toString(IgniteHadoopFileSystem.class, this); } + + /** + * Returns the user name this File System is created on behalf of. + * @return the user name + */ + public String user() { + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 00be422..75c8a49 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -126,11 +126,13 @@ public class HadoopUtils { break; case PHASE_REDUCE: - assert status.totalReducerCnt() > 0; - setupProgress = 1; mapProgress = 1; - reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + + if (status.totalReducerCnt() > 0) + reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + else + reduceProgress = 1f; break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index 27805f8..ca4da3c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; +import org.apache.hadoop.security.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; import java.net.*; +import java.security.*; /** * Encapsulates logic of secondary filesystem creation. @@ -109,10 +111,28 @@ public class SecondaryFileSystemProvider { /** * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs. - * @throws IOException + * @throws IOException in case of error. */ public AbstractFileSystem createAbstractFileSystem() throws IOException { - return AbstractFileSystem.get(uri, cfg); + if (userName == null) + return AbstractFileSystem.get(uri, cfg); + else { + String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); + + UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName); + + try { + return ugi.doAs(new PrivilegedExceptionAction() { + @Override public AbstractFileSystem run() throws IOException { + return AbstractFileSystem.get(uri, cfg); + } + }); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", ie); + } + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java index 509f443..6204a2a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java @@ -1,91 +1,108 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; - -import java.io.*; -import java.net.*; - -import static org.apache.ignite.configuration.FileSystemConfiguration.*; - -/** - * Wrapper of HDFS for support of separated working directory. - */ -public class HadoopDistributedFileSystem extends DistributedFileSystem { - /** User name for each thread. */ - private final ThreadLocal userName = new ThreadLocal() { - /** {@inheritDoc} */ - @Override protected String initialValue() { - return DFLT_USER_NAME; - } - }; - - /** Working directory for each thread. */ - private final ThreadLocal workingDir = new ThreadLocal() { - /** {@inheritDoc} */ - @Override protected Path initialValue() { - return getHomeDirectory(); - } - }; - - /** {@inheritDoc} */ - @Override public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - - setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); - } - - /** - * Set user name and default working directory for current thread. - * - * @param userName User name. - */ - public void setUser(String userName) { - this.userName.set(userName); - - setWorkingDirectory(getHomeDirectory()); - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + userName.get()); - - return path.makeQualified(getUri(), null); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path dir) { - Path fixedDir = fixRelativePart(dir); - - String res = fixedDir.toUri().getPath(); - - if (!DFSUtil.isValidName(res)) - throw new IllegalArgumentException("Invalid DFS directory name " + res); - - workingDir.set(fixedDir); - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workingDir.get(); - } -} +///* +//* Licensed to the Apache Software Foundation (ASF) under one or more +//* contributor license agreements. See the NOTICE file distributed with +//* this work for additional information regarding copyright ownership. +//* The ASF licenses this file to You under the Apache License, Version 2.0 +//* (the "License"); you may not use this file except in compliance with +//* the License. You may obtain a copy of the License at +//* +//* http://www.apache.org/licenses/LICENSE-2.0 +//* +//* Unless required by applicable law or agreed to in writing, software +//* distributed under the License is distributed on an "AS IS" BASIS, +//* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//* See the License for the specific language governing permissions and +//* limitations under the License. +//*/ +// +//package org.apache.ignite.internal.processors.hadoop.fs; +// +//import org.apache.hadoop.conf.*; +//import org.apache.hadoop.fs.*; +//import org.apache.hadoop.hdfs.*; +//import org.apache.hadoop.security.*; +//import org.apache.ignite.internal.util.typedef.*; +// +//import java.io.*; +//import java.lang.reflect.*; +//import java.net.*; +// +//import static org.apache.ignite.configuration.FileSystemConfiguration.*; +//import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*; +// +///** +//* Wrapper of HDFS for support of separated working directory. +//*/ +//public class HadoopDistributedFileSystem extends DistributedFileSystem { +//// /** User name for each thread. */ +//// private final ThreadLocal userName = new ThreadLocal() { +//// /** {@inheritDoc} */ +//// @Override protected String initialValue() { +//// return DFLT_USER_NAME; +//// } +//// }; +//// +//// /** Working directory for each thread. */ +//// private final ThreadLocal workingDir = new ThreadLocal() { +//// /** {@inheritDoc} */ +//// @Override protected Path initialValue() { +//// return getHomeDirectory(); +//// } +//// }; +// +// /** {@inheritDoc} */ +// @Override public void initialize(URI uri, Configuration conf) throws IOException { +// super.initialize(uri, conf); +// +// //setUser(getHadoopUser(conf)); +// } +// +// /** +// * Set user name and default working directory for current thread. +// * +// * @param userName User name. +// */ +// @Deprecated +// public void setUser(String userName) { +// //System.out.println(this + ": ##### setting user = " + userName + ", thread = " + Thread.currentThread()); +// +// DFSClient dfsClient = getClient(); +// +// try { +// Field f = DFSClient.class.getDeclaredField("ugi"); +// f.setAccessible(true); +// UserGroupInformation ugi = (UserGroupInformation)f.get(dfsClient); +// assert F.eq(ugi.getShortUserName(), userName); +// } catch (Throwable t) { +// t.printStackTrace(); +// } +// +// //this.userName.set(userName); +// +// //setWorkingDirectory(getHomeDirectory()); +// } +// +//// /** {@inheritDoc} */ +//// @Override public Path getHomeDirectory() { +//// Path path = new Path("/user/" + userName.get()); +//// +//// return path.makeQualified(getUri(), null); +//// } +//// +//// /** {@inheritDoc} */ +//// @Override public void setWorkingDirectory(Path dir) { +//// Path fixedDir = fixRelativePart(dir); +//// +//// String res = fixedDir.toUri().getPath(); +//// +//// if (!DFSUtil.isValidName(res)) +//// throw new IllegalArgumentException("Invalid DFS directory name " + res); +//// +//// workingDir.set(fixedDir); +//// } +//// +//// /** {@inheritDoc} */ +//// @Override public Path getWorkingDirectory() { +//// return workingDir.get(); +//// } +//} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java index f3f51d4..7631ae9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -29,18 +29,19 @@ public class HadoopFileSystemsUtils { /** Name of the property for setting working directory on create new local FS instance. */ public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; - /** - * Set user name and default working directory for current thread if it's supported by file system. - * - * @param fs File system. - * @param userName User name. - */ - public static void setUser(FileSystem fs, String userName) { - if (fs instanceof IgniteHadoopFileSystem) - ((IgniteHadoopFileSystem)fs).setUser(userName); - else if (fs instanceof HadoopDistributedFileSystem) - ((HadoopDistributedFileSystem)fs).setUser(userName); - } +// /** +// * Set user name and default working directory for current thread if it's supported by file system. +// * +// * @param fs File system. +// * @param userName User name. +// */ +// @Deprecated // TODO: remove this method. +// public static void setUser(FileSystem fs, String userName) { +// if (fs instanceof IgniteHadoopFileSystem) +// ((IgniteHadoopFileSystem)fs).setUser(userName); +//// else if (fs instanceof HadoopDistributedFileSystem) +//// ((HadoopDistributedFileSystem)fs).setUser(userName); +// } /** * Setup wrappers of filesystems to support the separate working directory. @@ -52,6 +53,7 @@ public class HadoopFileSystemsUtils { cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV2.class.getName()); - cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName()); +// // TODO: this should be removed: +// cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java index 2f19226..b9c5113 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java @@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs { * @throws IOException If failed. */ public void flush(HadoopIgfsStreamDelegate delegate) throws IOException; + + /** + * The user this Igfs instance works on behalf of. + * @return the user name. + */ + public String user(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java index 44e531e..df17198 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -46,14 +46,22 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** Logger. */ private final Log log; + /** The user this Igfs works on behalf of. */ + private final String user; + /** * Constructor. * * @param igfs Target IGFS. * @param log Log. */ - public HadoopIgfsInProc(IgfsEx igfs, Log log) { - this.igfs = igfs; + public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException { + this.user = userName; + + this.igfs = igfs.forUser(userName); + + assert this.user == this.igfs.user(); + this.log = log; bufSize = igfs.configuration().getBlockSize() * 2; @@ -407,4 +415,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { if (lsnr0 != null && log.isDebugEnabled()) log.debug("Removed stream event listener [delegate=" + delegate + ']'); } + + /** {@inheritDoc} */ + @Override public String user() { + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java index 7d590c7..856a4fb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java @@ -41,7 +41,7 @@ import java.util.concurrent.locks.*; @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") public class HadoopIgfsIpcIo implements HadoopIgfsIo { /** Logger. */ - private Log log; + private final Log log; /** Request futures map. */ private ConcurrentMap reqMap = http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java index 7dca049..9cee867 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java @@ -81,6 +81,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener /** IGFS name. */ private final String igfs; + /** The user this out proc is performing on behalf of. */ + private final String userName; + /** Client log. */ private final Log log; @@ -100,8 +103,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException { - this(host, port, grid, igfs, false, log); + public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException { + this(host, port, grid, igfs, false, log, user); } /** @@ -113,8 +116,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException { - this(null, port, grid, igfs, true, log); + public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException { + this(null, port, grid, igfs, true, log, user); } /** @@ -128,7 +131,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log) + private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user) throws IOException { assert host != null && !shmem || host == null && shmem : "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']'; @@ -138,6 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener this.grid = grid; this.igfs = igfs; this.log = log; + this.userName = user; io = HadoopIgfsIpcIo.get(log, endpoint); @@ -148,6 +152,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException { final IgfsHandshakeRequest req = new IgfsHandshakeRequest(); + req.userName(userName); req.gridName(grid); req.igfsName(igfs); req.logDirectory(logDir); @@ -173,6 +178,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(INFO); msg.path(path); + msg.userName(userName); return io.send(msg).chain(FILE_RES).get(); } @@ -184,6 +190,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(UPDATE); msg.path(path); msg.properties(props); + msg.userName(userName); return io.send(msg).chain(FILE_RES).get(); } @@ -196,6 +203,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.accessTime(accessTime); msg.modificationTime(modificationTime); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -207,6 +215,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(RENAME); msg.path(src); msg.destinationPath(dest); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -218,6 +227,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(DELETE); msg.path(path); msg.flag(recursive); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -231,6 +241,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.start(start); msg.length(len); + msg.userName(userName); return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get(); } @@ -241,6 +252,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(PATH_SUMMARY); msg.path(path); + msg.userName(userName); return io.send(msg).chain(SUMMARY_RES).get(); } @@ -252,6 +264,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(MAKE_DIRECTORIES); msg.path(path); msg.properties(props); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -262,6 +275,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(LIST_FILES); msg.path(path); + msg.userName(userName); return io.send(msg).chain(FILE_COL_RES).get(); } @@ -272,6 +286,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(LIST_PATHS); msg.path(path); + msg.userName(userName); return io.send(msg).chain(PATH_COL_RES).get(); } @@ -288,6 +303,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(OPEN_READ); msg.path(path); msg.flag(false); + msg.userName(userName); IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); @@ -303,6 +319,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.flag(true); msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch); + msg.userName(userName); IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); @@ -321,6 +338,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.properties(props); msg.replication(replication); msg.blockSize(blockSize); + msg.userName(userName); Long streamId = io.send(msg).chain(LONG_RES).get(); @@ -336,6 +354,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.flag(create); msg.properties(props); + msg.userName(userName); Long streamId = io.send(msg).chain(LONG_RES).get(); @@ -471,4 +490,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener } }; } + + /** {@inheritDoc} */ + @Override public String user() { + return userName; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java index 1dada21..1574152 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -55,6 +55,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs { /** Logger. */ private final Log log; + private final String userName; + /** * Constructor. * @@ -63,13 +65,14 @@ public class HadoopIgfsWrapper implements HadoopIgfs { * @param conf Configuration. * @param log Current logger. */ - public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException { + public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) throws IOException { try { this.authority = authority; this.endpoint = new HadoopIgfsEndpoint(authority); this.logDir = logDir; this.conf = conf; this.log = log; + this.userName = user; } catch (IgniteCheckedException e) { throw new IOException("Failed to parse endpoint: " + authority, e); @@ -362,7 +365,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsInProc(igfs, log); + hadoop = new HadoopIgfsInProc(igfs, log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } @@ -384,7 +387,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } @@ -409,7 +412,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { try { hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), - log); + log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } @@ -430,7 +433,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index 2b36267..9d7125a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -17,13 +17,21 @@ package org.apache.ignite.internal.processors.hadoop.taskexecutor; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.io.*; +import java.security.*; import java.util.*; import java.util.concurrent.*; @@ -99,6 +107,44 @@ public abstract class HadoopRunnableTask implements Callable { /** {@inheritDoc} */ @Override public Void call() throws IgniteCheckedException { + final String user = job.info().user(); + + assert !F.isEmpty(user); + + if (F.eq(user, FileSystemConfiguration.DFLT_USER_NAME)) + // do direct call: + return callImpl(); + else { + // do the call in the context of 'user': + try { + final String ticketCachePath; + + if (job instanceof HadoopV2Job) { + Configuration conf = ((HadoopV2Job)job).jobConf(); + + ticketCachePath = conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); + } else + ticketCachePath = job.info().property(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); + + UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user); + + return ugi.doAs(new PrivilegedExceptionAction() { + @Override public Void run() throws IgniteCheckedException { + return callImpl(); + } + }); + } catch (IOException | InterruptedException e) { + throw new IgniteCheckedException(e); + } + } + } + + /** + * Runnable task call implementation + * @return + * @throws IgniteCheckedException + */ + Void callImpl() throws IgniteCheckedException { execStartTs = U.currentTimeMillis(); Throwable err = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index e3c2bfa..ac10687 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -40,6 +40,7 @@ import java.util.Queue; import java.util.concurrent.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; +import static org.apache.ignite.internal.processors.hadoop.v2.HadoopV2JobResourceManager.*; /** * Hadoop job implementation for v2 API. @@ -68,7 +69,7 @@ public class HadoopV2Job implements HadoopJob { new ConcurrentHashMap8<>(); /** Pooling task context class and thus class loading environment. */ - private final Queue> taskCtxClsPool = new ConcurrentLinkedQueue<>(); + private final Queue> taskCtxClsPool = new ConcurrentLinkedQueue<>(); /** All created contexts. */ private final Queue> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); @@ -139,7 +140,7 @@ public class HadoopV2Job implements HadoopJob { Path jobDir = new Path(jobDirPath); - try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) { + try (FileSystem fs = fileSystemForUser(jobDir.toUri(), jobConf)) { JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, jobDir); @@ -194,7 +195,7 @@ public class HadoopV2Job implements HadoopJob { if (old != null) return old.get(); - Class cls = taskCtxClsPool.poll(); + Class cls = taskCtxClsPool.poll(); try { if (cls == null) { @@ -204,7 +205,7 @@ public class HadoopV2Job implements HadoopJob { HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); - cls = ldr.loadClass(HadoopV2TaskContext.class.getName()); + cls = (Class)ldr.loadClass(HadoopV2TaskContext.class.getName()); fullCtxClsQueue.add(cls); } @@ -263,6 +264,8 @@ public class HadoopV2Job implements HadoopJob { if (jobLocDir.exists()) U.delete(jobLocDir); } +// +// disposeFileSystem(); } finally { taskCtxClsPool.clear(); @@ -297,6 +300,25 @@ public class HadoopV2Job implements HadoopJob { } } +// /** +// * Closes the underlying file system. +// * @throws IgniteCheckedException on error. +// */ +// private void disposeFileSystem() throws IgniteCheckedException { +// FileSystem fs0 = fs; +// +// try { +// if (fs0 != null) +// fs0.close(); +// } +// catch (IOException ioe) { +// throw new IgniteCheckedException(ioe); +// } +// finally { +// fs = null; +// } +// } + /** {@inheritDoc} */ @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info)); @@ -319,4 +341,12 @@ public class HadoopV2Job implements HadoopJob { if (rsrcMgr != null) rsrcMgr.cleanupStagingDirectory(); } + + /** + * Getter for job configuration. + * @return + */ + public JobConf jobConf() { + return jobConf; + } }