Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 92DA4183E7 for ; Mon, 11 Jan 2016 15:27:39 +0000 (UTC) Received: (qmail 44963 invoked by uid 500); 11 Jan 2016 15:27:23 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 44778 invoked by uid 500); 11 Jan 2016 15:27:22 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 41444 invoked by uid 99); 11 Jan 2016 15:26:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Jan 2016 15:26:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 08B06E0F5A; Mon, 11 Jan 2016 15:26:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Mon, 11 Jan 2016 15:27:41 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [48/50] [abbrv] ignite git commit: IGNITE-2206: Hadoop file system creation is now abstracted out using factory interface. IGNITE-2206: Hadoop file system creation is now abstracted out using factory interface. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ed73b4a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ed73b4a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ed73b4a Branch: refs/heads/ignite-2234 Commit: 8ed73b4af8024167daeb4775e084b1f6a23fbf13 Parents: 7d58d14 Author: vozerov-gridgain Authored: Tue Jan 5 10:59:31 2016 +0400 Committer: vozerov-gridgain Committed: Tue Jan 5 10:59:31 2016 +0400 ---------------------------------------------------------------------- .../org/apache/ignite/igfs/IgfsUserContext.java | 16 +- .../igfs/secondary/IgfsSecondaryFileSystem.java | 14 - .../processors/hadoop/HadoopPayloadAware.java | 28 ++ .../ignite/internal/processors/igfs/IgfsEx.java | 13 - .../internal/processors/igfs/IgfsImpl.java | 16 +- .../internal/processors/igfs/IgfsPaths.java | 62 +++- .../igfs/IgfsSecondaryFileSystemImpl.java | 11 - .../visor/node/VisorIgfsConfiguration.java | 43 --- .../processors/igfs/IgfsAbstractSelfTest.java | 8 +- .../igfs/IgfsExUniversalFileSystemAdapter.java | 11 +- .../igfs/UniversalFileSystemAdapter.java | 5 +- .../hadoop/fs/BasicHadoopFileSystemFactory.java | 209 ++++++++++++ .../fs/CachingHadoopFileSystemFactory.java | 86 +++++ .../hadoop/fs/HadoopFileSystemFactory.java | 52 +++ .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 264 +++++++-------- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 144 +++++--- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 115 ++++--- .../hadoop/SecondaryFileSystemProvider.java | 139 -------- .../hadoop/fs/HadoopFileSystemCacheUtils.java | 8 +- .../hadoop/fs/HadoopLazyConcurrentMap.java | 5 +- .../ignite/igfs/Hadoop1DualAbstractTest.java | 14 +- .../igfs/HadoopFIleSystemFactorySelfTest.java | 326 +++++++++++++++++++ ...oopFileSystemUniversalFileSystemAdapter.java | 53 +-- ...oopSecondaryFileSystemConfigurationTest.java | 27 +- .../IgniteHadoopFileSystemAbstractSelfTest.java | 71 ++-- .../testsuites/IgniteHadoopTestSuite.java | 3 + 26 files changed, 1191 insertions(+), 552 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java index 8db4e23..1e1cd31 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java @@ -34,24 +34,24 @@ public abstract class IgfsUserContext { * The main contract of this method is that {@link #currentUser()} method invoked * inside closure always returns 'user' this callable executed with. * @param user the user name to invoke closure on behalf of. - * @param clo the closure to execute + * @param c the closure to execute * @param The type of closure result. * @return the result of closure execution. * @throws IllegalArgumentException if user name is null or empty String or if the closure is null. */ - public static T doAs(String user, final IgniteOutClosure clo) { + public static T doAs(String user, final IgniteOutClosure c) { if (F.isEmpty(user)) throw new IllegalArgumentException("Failed to use null or empty user name."); final String ctxUser = userStackThreadLocal.get(); if (F.eq(ctxUser, user)) - return clo.apply(); // correct context is already there + return c.apply(); // correct context is already there userStackThreadLocal.set(user); try { - return clo.apply(); + return c.apply(); } finally { userStackThreadLocal.set(ctxUser); @@ -81,24 +81,24 @@ public abstract class IgfsUserContext { * } * * @param user the user name to invoke closure on behalf of. - * @param clbl the Callable to execute + * @param c the Callable to execute * @param The type of callable result. * @return the result of closure execution. * @throws IllegalArgumentException if user name is null or empty String or if the closure is null. */ - public static T doAs(String user, final Callable clbl) throws Exception { + public static T doAs(String user, final Callable c) throws Exception { if (F.isEmpty(user)) throw new IllegalArgumentException("Failed to use null or empty user name."); final String ctxUser = userStackThreadLocal.get(); if (F.eq(ctxUser, user)) - return clbl.call(); // correct context is already there + return c.call(); // correct context is already there userStackThreadLocal.set(user); try { - return clbl.call(); + return c.call(); } finally { userStackThreadLocal.set(ctxUser); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java index ca6ecb7..3f124eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java @@ -192,18 +192,4 @@ public interface IgfsSecondaryFileSystem { * @throws IgniteException In case of error. */ public long usedSpaceSize() throws IgniteException; - - /** - * Gets the implementation specific properties of file system. - * - * @return Map of properties. - */ - public Map properties(); - - - /** - * Closes the secondary file system. - * @throws IgniteException in case of an error. - */ - public void close() throws IgniteException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java new file mode 100644 index 0000000..9b79729 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPayloadAware.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +/** + * Gets payload for Hadoop secondary file system. + */ +public interface HadoopPayloadAware { + /** + * @return Payload. + */ + public Object getPayload(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java index 8ff7247..cf268e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java @@ -43,19 +43,6 @@ public interface IgfsEx extends IgniteFileSystem { /** File property: prefer writes to local node. */ public static final String PROP_PREFER_LOCAL_WRITES = "locWrite"; - /** Property name for path to Hadoop configuration. */ - public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH"; - - /** Property name for URI of file system. */ - public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; - - /** Property name for default user name of file system. - * NOTE: for secondary file system this is just a default user name, which is used - * when the 2ndary filesystem is used outside of any user context. - * If another user name is set in the context, 2ndary file system will work on behalf - * of that user, which is different from the default. */ - public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME"; - /** * Stops IGFS cleaning all used resources. * http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 680e660..38914ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -87,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; @@ -200,6 +202,9 @@ public final class IgfsImpl implements IgfsEx { data = igfsCtx.data(); secondaryFs = cfg.getSecondaryFileSystem(); + if (secondaryFs instanceof LifecycleAware) + ((LifecycleAware) secondaryFs).start(); + /* Default IGFS mode. */ IgfsMode dfltMode; @@ -256,8 +261,12 @@ public final class IgfsImpl implements IgfsEx { modeRslvr = new IgfsModeResolver(dfltMode, modes); - secondaryPaths = new IgfsPaths(secondaryFs == null ? null : secondaryFs.properties(), dfltMode, - modeRslvr.modesOrdered()); + Object secondaryFsPayload = null; + + if (secondaryFs instanceof HadoopPayloadAware) + secondaryFsPayload = ((HadoopPayloadAware) secondaryFs).getPayload(); + + secondaryPaths = new IgfsPaths(secondaryFsPayload, dfltMode, modeRslvr.modesOrdered()); // Check whether IGFS LRU eviction policy is set on data cache. String dataCacheName = igfsCtx.configuration().getDataCacheName(); @@ -305,7 +314,8 @@ public final class IgfsImpl implements IgfsEx { batch.cancel(); try { - secondaryFs.close(); + if (secondaryFs instanceof LifecycleAware) + ((LifecycleAware)secondaryFs).stop(); } catch (Exception e) { log.error("Failed to close secondary file system.", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java index fbf89ce..4a79259 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java @@ -17,17 +17,21 @@ package org.apache.ignite.internal.processors.igfs; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.ArrayList; import java.util.List; -import java.util.Map; + +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.jetbrains.annotations.Nullable; /** @@ -37,8 +41,8 @@ public class IgfsPaths implements Externalizable { /** */ private static final long serialVersionUID = 0L; - /** Additional secondary file system properties. */ - private Map props; + /** */ + private byte[] payloadBytes; /** Default IGFS mode. */ private IgfsMode dfltMode; @@ -56,22 +60,25 @@ public class IgfsPaths implements Externalizable { /** * Constructor. * - * @param props Additional secondary file system properties. + * @param payload Payload. * @param dfltMode Default IGFS mode. * @param pathModes Path modes. + * @throws IgniteCheckedException If failed. */ - public IgfsPaths(Map props, IgfsMode dfltMode, @Nullable List> pathModes) { - this.props = props; + public IgfsPaths(Object payload, IgfsMode dfltMode, @Nullable List> pathModes) + throws IgniteCheckedException { this.dfltMode = dfltMode; this.pathModes = pathModes; - } - /** - * @return Secondary file system properties. - */ - public Map properties() { - return props; + if (payload == null) + payloadBytes = null; + else { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + new JdkMarshaller().marshal(payload, out); + + payloadBytes = out.toByteArray(); + } } /** @@ -88,9 +95,25 @@ public class IgfsPaths implements Externalizable { return pathModes; } + /** + * @return Payload. + * + * @throws IgniteCheckedException If failed to deserialize the payload. + */ + @Nullable public Object getPayload(ClassLoader clsLdr) throws IgniteCheckedException { + if (payloadBytes == null) + return null; + else { + ByteArrayInputStream in = new ByteArrayInputStream(payloadBytes); + + return new JdkMarshaller().unmarshal(in, clsLdr); + } + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeStringMap(out, props); + U.writeByteArray(out, payloadBytes); + U.writeEnum(out, dfltMode); if (pathModes != null) { @@ -98,7 +121,10 @@ public class IgfsPaths implements Externalizable { out.writeInt(pathModes.size()); for (T2 pathMode : pathModes) { + assert pathMode.getKey() != null; + pathMode.getKey().writeExternal(out); + U.writeEnum(out, pathMode.getValue()); } } @@ -108,7 +134,8 @@ public class IgfsPaths implements Externalizable { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - props = U.readStringMap(in); + payloadBytes = U.readByteArray(in); + dfltMode = IgfsMode.fromOrdinal(in.readByte()); if (in.readBoolean()) { @@ -118,11 +145,10 @@ public class IgfsPaths implements Externalizable { for (int i = 0; i < size; i++) { IgfsPath path = new IgfsPath(); - path.readExternal(in); - T2 entry = new T2<>(path, IgfsMode.fromOrdinal(in.readByte())); + path.readExternal(in); - pathModes.add(entry); + pathModes.add(new T2<>(path, IgfsMode.fromOrdinal(in.readByte()))); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java index 23d6322..44e858f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.igfs; import java.io.OutputStream; import java.util.Collection; -import java.util.Collections; import java.util.Map; import org.apache.ignite.IgniteException; import org.apache.ignite.igfs.IgfsFile; @@ -116,14 +115,4 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { @Override public long usedSpaceSize() throws IgniteException { return igfs.usedSpaceSize(); } - - /** {@inheritDoc} */ - @Override public Map properties() { - return Collections.emptyMap(); - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteException { - // No-op. - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java index e85484d..ea0e721 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java @@ -29,9 +29,6 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_USER_NAME; import static org.apache.ignite.internal.visor.util.VisorTaskUtils.compactClass; /** @@ -65,15 +62,6 @@ public class VisorIgfsConfiguration implements Serializable { /** Number of batches that can be concurrently sent to remote node. */ private int perNodeParallelBatchCnt; - /** URI of the secondary Hadoop file system. */ - private String secondaryHadoopFileSysUri; - - /** Path for the secondary hadoop file system config. */ - private String secondaryHadoopFileSysCfgPath; - - /** User name for the secondary hadoop file system config. */ - private String secondaryHadoopFileSysUserName; - /** IGFS instance mode. */ private IgfsMode dfltMode; @@ -141,16 +129,6 @@ public class VisorIgfsConfiguration implements Serializable { cfg.perNodeBatchSize = igfs.getPerNodeBatchSize(); cfg.perNodeParallelBatchCnt = igfs.getPerNodeParallelBatchCount(); - IgfsSecondaryFileSystem secFs = igfs.getSecondaryFileSystem(); - - if (secFs != null) { - Map props = secFs.properties(); - - cfg.secondaryHadoopFileSysUri = props.get(SECONDARY_FS_URI); - cfg.secondaryHadoopFileSysCfgPath = props.get(SECONDARY_FS_CONFIG_PATH); - cfg.secondaryHadoopFileSysUserName = props.get(SECONDARY_FS_USER_NAME); - } - cfg.dfltMode = igfs.getDefaultMode(); cfg.pathModes = igfs.getPathModes(); cfg.dualModePutExecutorSrvc = compactClass(igfs.getDualModePutExecutorService()); @@ -251,27 +229,6 @@ public class VisorIgfsConfiguration implements Serializable { } /** - * @return URI of the secondary Hadoop file system. - */ - @Nullable public String secondaryHadoopFileSystemUri() { - return secondaryHadoopFileSysUri; - } - - /** - * @return User name of the secondary Hadoop file system. - */ - @Nullable public String secondaryHadoopFileSystemUserName() { - return secondaryHadoopFileSysUserName; - } - - /** - * @return Path for the secondary hadoop file system config. - */ - @Nullable public String secondaryHadoopFileSystemConfigPath() { - return secondaryHadoopFileSysCfgPath; - } - - /** * @return IGFS instance mode. */ public IgfsMode defaultMode() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index b290303..015b992 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -2744,7 +2744,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { finally { U.closeQuiet(os); - IgfsEx igfsEx = uni.getAdapter(IgfsEx.class); + IgfsEx igfsEx = uni.unwrap(IgfsEx.class); if (igfsEx != null) awaitFileClose(igfsEx.asSecondary(), file); @@ -2868,7 +2868,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @throws IgniteCheckedException If failed. */ protected void checkExist(UniversalFileSystemAdapter uni, IgfsPath... paths) throws IgniteCheckedException { - IgfsEx ex = uni.getAdapter(IgfsEx.class); + IgfsEx ex = uni.unwrap(IgfsEx.class); for (IgfsPath path : paths) { if (ex != null) assert ex.context().meta().fileId(path) != null : "Path doesn't exist [igfs=" + ex.name() + @@ -2921,7 +2921,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @throws Exception If failed. */ protected void checkNotExist(UniversalFileSystemAdapter uni, IgfsPath... paths) throws Exception { - IgfsEx ex = uni.getAdapter(IgfsEx.class); + IgfsEx ex = uni.unwrap(IgfsEx.class); for (IgfsPath path : paths) { if (ex != null) assert ex.context().meta().fileId(path) == null : "Path exists [igfs=" + ex.name() + ", path=" + @@ -3222,7 +3222,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { */ @SuppressWarnings("unchecked") public static void clear(UniversalFileSystemAdapter uni) throws Exception { - IgfsEx igfsEx = uni.getAdapter(IgfsEx.class); + IgfsEx igfsEx = uni.unwrap(IgfsEx.class); if (igfsEx != null) clear(igfsEx); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java index 7583364..c6bef72 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsExUniversalFileSystemAdapter.java @@ -28,7 +28,6 @@ import org.apache.ignite.igfs.IgfsPath; * Universal adapter over {@link IgfsEx} filesystem. */ public class IgfsExUniversalFileSystemAdapter implements UniversalFileSystemAdapter { - /** The wrapped igfs. */ private final IgfsEx igfsEx; @@ -69,18 +68,14 @@ public class IgfsExUniversalFileSystemAdapter implements UniversalFileSystemAdap @Override public boolean delete(String path, boolean recursive) throws IOException { IgfsPath igfsPath = new IgfsPath(path); - boolean del = igfsEx.delete(igfsPath, recursive); - - return del; + return igfsEx.delete(igfsPath, recursive); } /** {@inheritDoc} */ @Override public InputStream openInputStream(String path) throws IOException { IgfsPath igfsPath = new IgfsPath(path); - IgfsInputStreamAdapter adapter = igfsEx.open(igfsPath); - - return adapter; + return igfsEx.open(igfsPath); } /** {@inheritDoc} */ @@ -97,7 +92,7 @@ public class IgfsExUniversalFileSystemAdapter implements UniversalFileSystemAdap } /** {@inheritDoc} */ - @Override public T getAdapter(Class clazz) { + @Override public T unwrap(Class clazz) { if (clazz == IgfsEx.class) return (T)igfsEx; http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java index ba8c164..eef0057 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/UniversalFileSystemAdapter.java @@ -31,8 +31,9 @@ public interface UniversalFileSystemAdapter { /** * Gets name of the FS. * @return name of this file system. + * @throws IOException in case of failure. */ - String name(); + String name() throws IOException; /** * Answers if a file denoted by path exists. @@ -93,5 +94,5 @@ public interface UniversalFileSystemAdapter { * @param The type we need to adapt to. * @return the adapter object of the given type. */ - T getAdapter(Class clazz); + T unwrap(Class clazz); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java new file mode 100644 index 0000000..1e2bbf2 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.internal.processors.hadoop.HadoopUtils; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.jetbrains.annotations.Nullable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Arrays; + +/** + * Simple Hadoop file system factory which delegates to {@code FileSystem.get()} on each call. + *

+ * If {@code "fs.[prefix].impl.disable.cache"} is set to {@code true}, file system instances will be cached by Hadoop. + */ +public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable, LifecycleAware { + /** */ + private static final long serialVersionUID = 0L; + + /** File system URI. */ + protected String uri; + + /** File system config paths. */ + protected String[] cfgPaths; + + /** Configuration of the secondary filesystem, never null. */ + protected transient Configuration cfg; + + /** Resulting URI. */ + protected transient URI fullUri; + + /** + * Constructor. + */ + public BasicHadoopFileSystemFactory() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public FileSystem get(String usrName) throws IOException { + return create0(IgfsUtils.fixUserName(usrName)); + } + + /** + * Internal file system create routine. + * + * @param usrName User name. + * @return File system. + * @throws IOException If failed. + */ + protected FileSystem create0(String usrName) throws IOException { + assert cfg != null; + + try { + return FileSystem.get(fullUri, cfg, usrName); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", e); + } + } + + /** + * Gets file system URI. + *

+ * This URI will be used as a first argument when calling {@link FileSystem#get(URI, Configuration, String)}. + *

+ * If not set, default URI will be picked from file system configuration using + * {@link FileSystem#getDefaultUri(Configuration)} method. + * + * @return File system URI. + */ + @Nullable public String getUri() { + return uri; + } + + /** + * Sets file system URI. See {@link #getUri()} for more information. + * + * @param uri File system URI. + */ + public void setUri(@Nullable String uri) { + this.uri = uri; + } + + /** + * Gets paths to additional file system configuration files (e.g. core-site.xml). + *

+ * Path could be either absolute or relative to {@code IGNITE_HOME} environment variable. + *

+ * All provided paths will be loaded in the order they provided and then applied to {@link Configuration}. It means + * that path order might be important in some cases. + *

+ * NOTE! Factory can be serialized and transferred to other machines where instance of + * {@link IgniteHadoopFileSystem} resides. Corresponding paths must exist on these machines as well. + * + * @return Paths to file system configuration files. + */ + @Nullable public String[] getConfigPaths() { + return cfgPaths; + } + + /** + * Set paths to additional file system configuration files (e.g. core-site.xml). See {@link #getConfigPaths()} for + * more information. + * + * @param cfgPaths Paths to file system configuration files. + */ + public void setConfigPaths(String... cfgPaths) { + this.cfgPaths = cfgPaths; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + cfg = HadoopUtils.safeCreateConfiguration(); + + if (cfgPaths != null) { + for (String cfgPath : cfgPaths) { + if (cfgPath == null) + throw new NullPointerException("Configuration path cannot be null: " + Arrays.toString(cfgPaths)); + else { + URL url = U.resolveIgniteUrl(cfgPath); + + if (url == null) { + // If secConfPath is given, it should be resolvable: + throw new IgniteException("Failed to resolve secondary file system configuration path " + + "(ensure that it exists locally and you have read access to it): " + cfgPath); + } + + cfg.addResource(url); + } + } + } + + // If secondary fs URI is not given explicitly, try to get it from the configuration: + if (uri == null) + fullUri = FileSystem.getDefaultUri(cfg); + else { + try { + fullUri = new URI(uri); + } + catch (URISyntaxException use) { + throw new IgniteException("Failed to resolve secondary file system URI: " + uri); + } + } + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, uri); + + if (cfgPaths != null) { + out.writeInt(cfgPaths.length); + + for (String cfgPath : cfgPaths) + U.writeString(out, cfgPath); + } + else + out.writeInt(-1); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + uri = U.readString(in); + + int cfgPathsCnt = in.readInt(); + + if (cfgPathsCnt != -1) { + cfgPaths = new String[cfgPathsCnt]; + + for (int i = 0; i < cfgPathsCnt; i++) + cfgPaths[i] = U.readString(in); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java new file mode 100644 index 0000000..91f7777 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; + +import java.io.IOException; +import java.net.URI; + +/** + * Caching Hadoop file system factory. Caches {@link FileSystem} instances on per-user basis. Doesn't rely on + * built-in Hadoop {@code FileSystem} caching mechanics. Separate {@code FileSystem} instance is created for each + * user instead. + *

+ * This makes cache instance resistant to concurrent calls to {@link FileSystem#close()} in other parts of the user + * code. On the other hand, this might cause problems on some environments. E.g. if Kerberos is enabled, a call to + * {@link FileSystem#get(URI, Configuration, String)} will refresh Kerberos token. But this factory implementation + * calls this method only once per user what may lead to token expiration. In such cases it makes sense to either + * use {@link BasicHadoopFileSystemFactory} or implement your own factory. + */ +public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory { + /** */ + private static final long serialVersionUID = 0L; + + /** Per-user file system cache. */ + private final transient HadoopLazyConcurrentMap cache = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory() { + @Override public FileSystem createValue(String key) throws IOException { + return create0(key); + } + } + ); + + /** + * Public non-arg constructor. + */ + public CachingHadoopFileSystemFactory() { + // noop + } + + /** {@inheritDoc} */ + @Override public FileSystem get(String usrName) throws IOException { + return cache.getOrCreate(IgfsUtils.fixUserName(usrName)); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + super.start(); + + // Disable caching. + cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + super.stop(); + + try { + cache.close(); + } + catch (IgniteCheckedException ice) { + throw new IgniteException(ice); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java new file mode 100644 index 0000000..5ad08ab --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.fs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; +import org.apache.ignite.igfs.IgfsMode; +import org.apache.ignite.lifecycle.LifecycleAware; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Factory for Hadoop {@link FileSystem} used by {@link IgniteHadoopIgfsSecondaryFileSystem}. + *

+ * {@link #get(String)} method will be used whenever a call to a target {@code FileSystem} is required. + *

+ * It is implementation dependent whether to rely on built-in Hadoop file system cache, implement own caching facility + * or doesn't cache file systems at all. + *

+ * Concrete factory may implement {@link LifecycleAware} interface. In this case start and stop callbacks will be + * performed by Ignite. You may want to implement some initialization or cleanup there. + *

+ * Note that factory extends {@link Serializable} interface as it might be necessary to transfer factories over the + * wire to {@link IgniteHadoopFileSystem} if {@link IgfsMode#PROXY} is enabled for some file + * system paths. + */ +public interface HadoopFileSystemFactory extends Serializable { + /** + * Gets file system for the given user name. + * + * @param usrName User name + * @return File system. + * @throws IOException In case of error. + */ + public FileSystem get(String usrName) throws IOException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index 1ca6938..9f544c1 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -17,15 +17,7 @@ package org.apache.ignite.hadoop.fs; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.ParentNotDirectoryException; @@ -35,6 +27,7 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsFile; @@ -45,71 +38,59 @@ import org.apache.ignite.igfs.IgfsPathNotFoundException; import org.apache.ignite.igfs.IgfsUserContext; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.ValueFactory; +import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; import org.apache.ignite.internal.processors.igfs.IgfsFileInfo; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Callable; + import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_USER_NAME; /** - * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}. - * In fact, this class deals with different FileSystems depending on the user context, - * see {@link IgfsUserContext#currentUser()}. + * Secondary file system which delegates calls to an instance of Hadoop {@link FileSystem}. + *

+ * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}. */ -public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem { - /** Properties of file system, see {@link #properties()} - * - * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH} - * See {@link IgfsEx#SECONDARY_FS_URI} - * See {@link IgfsEx#SECONDARY_FS_USER_NAME} - * */ - private final Map props = new HashMap<>(); - - /** Secondary file system provider. */ - private final SecondaryFileSystemProvider secProvider; - +public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, LifecycleAware, + HadoopPayloadAware { /** The default user name. It is used if no user context is set. */ - private final String dfltUserName; + private String dfltUsrName; - /** FileSystem instance created for the default user. - * Stored outside the fileSysLazyMap due to performance reasons. */ - private final FileSystem dfltFs; + /** Factory. */ + private HadoopFileSystemFactory fsFactory; - /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ - private final HadoopLazyConcurrentMap fileSysLazyMap = new HadoopLazyConcurrentMap<>( - new ValueFactory() { - @Override public FileSystem createValue(String key) { - try { - assert !F.isEmpty(key); - - return secProvider.createFileSystem(key); - } - catch (IOException ioe) { - throw new IgniteException(ioe); - } - } - } - ); + /** + * Default constructor for Spring. + */ + public IgniteHadoopIgfsSecondaryFileSystem() { + // No-op. + } /** * Simple constructor that is to be used by default. * * @param uri URI of file system. * @throws IgniteCheckedException In case of error. + * @deprecated Use {@link #getFileSystemFactory()} instead. */ + @Deprecated public IgniteHadoopIgfsSecondaryFileSystem(String uri) throws IgniteCheckedException { this(uri, null, null); } @@ -120,7 +101,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @param uri URI of file system. * @param cfgPath Additional path to Hadoop configuration. * @throws IgniteCheckedException In case of error. + * @deprecated Use {@link #getFileSystemFactory()} instead. */ + @Deprecated public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException { this(uri, cfgPath, null); @@ -131,46 +114,73 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * * @param uri URI of file system. * @param cfgPath Additional path to Hadoop configuration. - * @param userName User name. + * @param usrName User name. * @throws IgniteCheckedException In case of error. + * @deprecated Use {@link #getFileSystemFactory()} instead. */ + @Deprecated public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath, - @Nullable String userName) throws IgniteCheckedException { - // Treat empty uri and userName arguments as nulls to improve configuration usability: - if (F.isEmpty(uri)) - uri = null; - - if (F.isEmpty(cfgPath)) - cfgPath = null; - - if (F.isEmpty(userName)) - userName = null; + @Nullable String usrName) throws IgniteCheckedException { + setDefaultUserName(usrName); - this.dfltUserName = IgfsUtils.fixUserName(userName); + CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory(); - try { - this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath); + fac.setUri(uri); - // File system creation for the default user name. - // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field: - this.dfltFs = secProvider.createFileSystem(dfltUserName); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } + if (cfgPath != null) + fac.setConfigPaths(cfgPath); - assert dfltFs != null; + setFileSystemFactory(fac); + } - uri = secProvider.uri().toString(); + /** + * Gets default user name. + *

+ * Defines user name which will be used during file system invocation in case no user name is defined explicitly + * through {@link FileSystem#get(URI, Configuration, String)}. + *

+ * Also this name will be used if you manipulate {@link IgniteFileSystem} directly and do not set user name + * explicitly using {@link IgfsUserContext#doAs(String, IgniteOutClosure)} or + * {@link IgfsUserContext#doAs(String, Callable)} methods. + *

+ * If not set value of system property {@code "user.name"} will be used. If this property is not set either, + * {@code "anonymous"} will be used. + * + * @return Default user name. + */ + @Nullable public String getDefaultUserName() { + return dfltUsrName; + } - if (!uri.endsWith("/")) - uri += "/"; + /** + * Sets default user name. See {@link #getDefaultUserName()} for details. + * + * @param dfltUsrName Default user name. + */ + public void setDefaultUserName(@Nullable String dfltUsrName) { + this.dfltUsrName = dfltUsrName; + } - if (cfgPath != null) - props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); + /** + * Gets secondary file system factory. + *

+ * This factory will be used whenever a call to a target {@link FileSystem} is required. + *

+ * If not set, {@link CachingHadoopFileSystemFactory} will be used. + * + * @return Secondary file system factory. + */ + public HadoopFileSystemFactory getFileSystemFactory() { + return fsFactory; + } - props.put(SECONDARY_FS_URI, uri); - props.put(SECONDARY_FS_USER_NAME, dfltUserName); + /** + * Sets secondary file system factory. See {@link #getFileSystemFactory()} for details. + * + * @param factory Secondary file system factory. + */ + public void setFileSystemFactory(HadoopFileSystemFactory factory) { + this.fsFactory = factory; } /** @@ -180,7 +190,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @return Hadoop path. */ private Path convert(IgfsPath path) { - URI uri = fileSysForUser().getUri(); + URI uri = fileSystemForUser().getUri(); return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); } @@ -234,7 +244,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public boolean exists(IgfsPath path) { try { - return fileSysForUser().exists(convert(path)); + return fileSystemForUser().exists(convert(path)); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); @@ -245,7 +255,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Nullable @Override public IgfsFile update(IgfsPath path, Map props) { HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); - final FileSystem fileSys = fileSysForUser(); + final FileSystem fileSys = fileSystemForUser(); try { if (props0.userName() != null || props0.groupName() != null) @@ -266,7 +276,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Override public void rename(IgfsPath src, IgfsPath dest) { // Delegate to the secondary file system. try { - if (!fileSysForUser().rename(convert(src), convert(dest))) + if (!fileSystemForUser().rename(convert(src), convert(dest))) throw new IgfsException("Failed to rename (secondary file system returned false) " + "[src=" + src + ", dest=" + dest + ']'); } @@ -278,7 +288,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public boolean delete(IgfsPath path, boolean recursive) { try { - return fileSysForUser().delete(convert(path), recursive); + return fileSystemForUser().delete(convert(path), recursive); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); @@ -288,7 +298,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void mkdirs(IgfsPath path) { try { - if (!fileSysForUser().mkdirs(convert(path))) + if (!fileSystemForUser().mkdirs(convert(path))) throw new IgniteException("Failed to make directories [path=" + path + "]"); } catch (IOException e) { @@ -299,7 +309,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void mkdirs(IgfsPath path, @Nullable Map props) { try { - if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) + if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); } catch (IOException e) { @@ -310,7 +320,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public Collection listPaths(IgfsPath path) { try { - FileStatus[] statuses = fileSysForUser().listStatus(convert(path)); + FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); if (statuses == null) throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); @@ -333,7 +343,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public Collection listFiles(IgfsPath path) { try { - FileStatus[] statuses = fileSysForUser().listStatus(convert(path)); + FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); if (statuses == null) throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); @@ -360,13 +370,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { - return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize); + return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize); } /** {@inheritDoc} */ @Override public OutputStream create(IgfsPath path, boolean overwrite) { try { - return fileSysForUser().create(convert(path), overwrite); + return fileSystemForUser().create(convert(path), overwrite); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); @@ -380,8 +390,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys new HadoopIgfsProperties(props != null ? props : Collections.emptyMap()); try { - return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize, - (short)replication, blockSize, null); + return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize, + (short) replication, blockSize, null); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + @@ -394,7 +404,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, @Nullable Map props) { try { - return fileSysForUser().append(convert(path), bufSize); + return fileSystemForUser().append(convert(path), bufSize); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); @@ -404,7 +414,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public IgfsFile info(final IgfsPath path) { try { - final FileStatus status = fileSysForUser().getFileStatus(convert(path)); + final FileStatus status = fileSystemForUser().getFileStatus(convert(path)); if (status == null) return null; @@ -479,65 +489,61 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys try { // We don't use FileSystem#getUsed() since it counts only the files // in the filesystem root, not all the files recursively. - return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed(); + return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed(); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to get used space size of file system."); } } - /** {@inheritDoc} */ - @Override public Map properties() { - return props; - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteException { - Exception e = null; - - try { - dfltFs.close(); - } - catch (Exception e0) { - e = e0; - } - - try { - fileSysLazyMap.close(); - } - catch (IgniteCheckedException ice) { - if (e == null) - e = ice; - } - - if (e != null) - throw new IgniteException(e); - } - /** * Gets the underlying {@link FileSystem}. * This method is used solely for testing. * @return the underlying Hadoop {@link FileSystem}. */ public FileSystem fileSystem() { - return fileSysForUser(); + return fileSystemForUser(); } /** * Gets the FileSystem for the current context user. * @return the FileSystem instance, never null. */ - private FileSystem fileSysForUser() { + private FileSystem fileSystemForUser() { String user = IgfsUserContext.currentUser(); if (F.isEmpty(user)) - user = dfltUserName; // default is never empty. + user = IgfsUtils.fixUserName(dfltUsrName); assert !F.isEmpty(user); - if (F.eq(user, dfltUserName)) - return dfltFs; // optimization + try { + return fsFactory.get(user); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + dfltUsrName = IgfsUtils.fixUserName(dfltUsrName); + + if (fsFactory == null) + fsFactory = new CachingHadoopFileSystemFactory(); + + if (fsFactory instanceof LifecycleAware) + ((LifecycleAware) fsFactory).start(); + } - return fileSysLazyMap.getOrCreate(user); + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (fsFactory instanceof LifecycleAware) + ((LifecycleAware)fsFactory).stop(); + } + + /** {@inheritDoc} */ + @Override public HadoopFileSystemFactory getPayload() { + return fsFactory; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 5dce67f..71f6435 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -17,19 +17,6 @@ package org.apache.ignite.hadoop.fs.v1; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; @@ -43,7 +30,9 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsFile; @@ -51,7 +40,6 @@ import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsPathSummary; import org.apache.ignite.internal.igfs.common.IgfsLogger; -import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream; @@ -68,8 +56,23 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE; import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR; import static org.apache.ignite.igfs.IgfsMode.PROXY; @@ -85,8 +88,6 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PREFER_LOCAL_WRITES; import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI; /** * {@code IGFS} Hadoop 1.x file system driver over file system API. To use @@ -165,8 +166,8 @@ public class IgniteHadoopFileSystem extends FileSystem { /** IGFS mode resolver. */ private IgfsModeResolver modeRslvr; - /** Secondary file system instance. */ - private FileSystem secondaryFs; + /** The secondary file system factory. */ + private HadoopFileSystemFactory factory; /** Management connection flag. */ private boolean mgmt; @@ -327,21 +328,28 @@ public class IgniteHadoopFileSystem extends FileSystem { } if (initSecondary) { - Map props = paths.properties(); + try { + factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader()); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to get secondary file system factory.", e); + } + + assert factory != null; - String secUri = props.get(SECONDARY_FS_URI); - String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).start(); try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); + FileSystem secFs = factory.get(user); - secondaryFs = secProvider.createFileSystem(user); + secondaryUri = secFs.getUri(); - secondaryUri = secProvider.uri(); + A.ensure(secondaryUri != null, "Secondary file system uri should not be null."); } catch (IOException e) { if (!mgmt) - throw new IOException("Failed to connect to the secondary file system: " + secUri, e); + throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e); else LOG.warn("Visor failed to create secondary file system (operations on paths with PROXY mode " + "will have no effect): " + e.getMessage()); @@ -409,8 +417,8 @@ public class IgniteHadoopFileSystem extends FileSystem { if (clientLog.isLogEnabled()) clientLog.close(); - if (secondaryFs != null) - U.closeQuiet(secondaryFs); + if (factory instanceof LifecycleAware) + ((LifecycleAware) factory).stop(); // Reset initialized resources. uri = null; @@ -425,6 +433,8 @@ public class IgniteHadoopFileSystem extends FileSystem { A.notNull(p, "p"); if (mode(p) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -453,6 +463,8 @@ public class IgniteHadoopFileSystem extends FileSystem { A.notNull(p, "p"); if (mode(p) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -482,6 +494,8 @@ public class IgniteHadoopFileSystem extends FileSystem { try { if (mode(p) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -490,8 +504,7 @@ public class IgniteHadoopFileSystem extends FileSystem { } secondaryFs.setOwner(toSecondary(p), username, grpName); - } - else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, username, PROP_GROUP_NAME, grpName)) == null) + } else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, username, PROP_GROUP_NAME, grpName)) == null) throw new IOException("Failed to set file permission (file not found?)" + " [path=" + p + ", userName=" + username + ", groupName=" + grpName + ']'); } @@ -511,6 +524,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsMode mode = mode(path); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -583,6 +598,8 @@ public class IgniteHadoopFileSystem extends FileSystem { path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -664,6 +681,8 @@ public class IgniteHadoopFileSystem extends FileSystem { ", path=" + path + ", bufSize=" + bufSize + ']'); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -727,6 +746,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsMode mode = mode(srcPath); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -787,6 +808,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsMode mode = mode(path); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -832,6 +855,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsMode mode = mode(path); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -896,26 +921,35 @@ public class IgniteHadoopFileSystem extends FileSystem { /** {@inheritDoc} */ @Override public void setWorkingDirectory(Path newPath) { - if (newPath == null) { - Path homeDir = getHomeDirectory(); + try { + if (newPath == null) { + Path homeDir = getHomeDirectory(); - if (secondaryFs != null) - secondaryFs.setWorkingDirectory(toSecondary(homeDir)); + FileSystem secondaryFs = secondaryFileSystem(); - workingDir = homeDir; - } - else { - Path fixedNewPath = fixRelativePart(newPath); + if (secondaryFs != null) + secondaryFs.setWorkingDirectory(toSecondary(homeDir)); + + workingDir = homeDir; + } + else { + Path fixedNewPath = fixRelativePart(newPath); - String res = fixedNewPath.toUri().getPath(); + String res = fixedNewPath.toUri().getPath(); - if (!DFSUtil.isValidName(res)) - throw new IllegalArgumentException("Invalid DFS directory name " + res); + if (!DFSUtil.isValidName(res)) + throw new IllegalArgumentException("Invalid DFS directory name " + res); - if (secondaryFs != null) - secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath)); + FileSystem secondaryFs = secondaryFileSystem(); - workingDir = fixedNewPath; + if (secondaryFs != null) + secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath)); + + workingDir = fixedNewPath; + } + } + catch (IOException e) { + throw new RuntimeException("Failed to obtain secondary file system instance.", e); } } @@ -936,6 +970,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsMode mode = mode(path); if (mode == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -977,6 +1013,8 @@ public class IgniteHadoopFileSystem extends FileSystem { try { if (mode(f) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -1007,6 +1045,8 @@ public class IgniteHadoopFileSystem extends FileSystem { try { if (mode(f) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -1038,6 +1078,8 @@ public class IgniteHadoopFileSystem extends FileSystem { IgfsPath path = convert(status.getPath()); if (mode(status.getPath()) == PROXY) { + final FileSystem secondaryFs = secondaryFileSystem(); + if (secondaryFs == null) { assert mgmt; @@ -1103,7 +1145,7 @@ public class IgniteHadoopFileSystem extends FileSystem { * @return {@code true} If secondary file system is initialized. */ public boolean hasSecondaryFileSystem() { - return secondaryFs != null; + return factory != null; } /** @@ -1123,7 +1165,7 @@ public class IgniteHadoopFileSystem extends FileSystem { * @return Secondary file system path. */ private Path toSecondary(Path path) { - assert secondaryFs != null; + assert factory != null; assert secondaryUri != null; return convertPath(path, secondaryUri); @@ -1298,4 +1340,16 @@ public class IgniteHadoopFileSystem extends FileSystem { public String user() { return user; } + + /** + * Gets cached or creates a {@link FileSystem}. + * + * @return The secondary file system. + */ + private @Nullable FileSystem secondaryFileSystem() throws IOException{ + if (factory == null) + return null; + + return factory.get(user); + } } \ No newline at end of file