Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 64DEF17663 for ; Thu, 5 Mar 2015 01:19:05 +0000 (UTC) Received: (qmail 93119 invoked by uid 500); 5 Mar 2015 01:19:05 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 93084 invoked by uid 500); 5 Mar 2015 01:19:05 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 93075 invoked by uid 99); 5 Mar 2015 01:19:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 01:19:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 05 Mar 2015 01:18:38 +0000 Received: (qmail 90750 invoked by uid 99); 5 Mar 2015 01:18:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 01:18:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A54A1E0FC4; Thu, 5 Mar 2015 01:18:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: anovikov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 05 Mar 2015 01:18:41 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [08/14] incubator-ignite git commit: # IGNITE-349: Applied patch from Ivan V.. X-Virus-Checked: Checked by ClamAV on apache.org # IGNITE-349: Applied patch from Ivan V.. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/03b966fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/03b966fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/03b966fe Branch: refs/heads/ignite-185 Commit: 03b966fe01c341a9ff6dcfcbaddaaa8e7f64ec84 Parents: 5d2a7c6 Author: vozerov-gridgain Authored: Wed Mar 4 17:50:53 2015 +0300 Committer: vozerov-gridgain Committed: Wed Mar 4 17:50:53 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/processors/igfs/IgfsEx.java | 6 + .../visor/node/VisorIgfsConfiguration.java | 7 +- .../hadoop/IgfsHadoopFileSystemWrapper.java | 44 +- .../igfs/hadoop/v1/IgfsHadoopFileSystem.java | 48 +- .../igfs/hadoop/v2/IgfsHadoopFileSystem.java | 54 +- .../hadoop/SecondaryFileSystemProvider.java | 111 ++++ ...oopSecondaryFileSystemConfigurationTest.java | 541 +++++++++++++++++++ .../testsuites/IgniteHadoopTestSuite.java | 2 + 8 files changed, 712 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java index e067e78..a380a6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java @@ -29,6 +29,12 @@ import java.net.*; * Internal API extension for {@link org.apache.ignite.IgniteFs}. */ public interface IgfsEx extends IgniteFs { + /** Property name for path to Hadoop configuration. */ + String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH"; + + /** Property name for URI of file system. */ + String SECONDARY_FS_URI = "SECONDARY_FS_URI"; + /** * Stops IGFS cleaning all used resources. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java index 056ac7f..78943a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java @@ -21,6 +21,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; +import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; import java.io.*; import java.util.*; @@ -31,12 +32,6 @@ import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*; * Data transfer object for IGFS configuration properties. */ public class VisorIgfsConfiguration implements Serializable { - /** Property name for path to Hadoop configuration. */ - public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH"; - - /** Property name for URI of file system. */ - public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; - /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java index 29dfde5..bdab61d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java @@ -17,7 +17,6 @@ package org.apache.ignite.igfs.hadoop; -import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; @@ -25,10 +24,11 @@ import org.apache.hadoop.ipc.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; +import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; import java.io.*; import java.net.*; @@ -38,11 +38,6 @@ import java.util.*; * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}. */ public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable { - /** Property name for path to Hadoop configuration. */ - public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH"; - - /** Property name for URI of file system. */ - public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; /** Hadoop file system. */ private final FileSystem fileSys; @@ -51,6 +46,16 @@ public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable { private final Map props = new HashMap<>(); /** + * Simple constructor that is to be used by default. + * + * @param uri URI of file system. + * @throws IgniteCheckedException In case of error. + */ + public IgfsHadoopFileSystemWrapper(String uri) throws IgniteCheckedException { + this(uri, null); + } + + /** * Constructor. * * @param uri URI of file system. @@ -58,25 +63,22 @@ public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable { * @throws IgniteCheckedException In case of error. */ public IgfsHadoopFileSystemWrapper(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException { - Configuration cfg = new Configuration(); + try { + SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath); - if (cfgPath != null) - cfg.addResource(U.resolveIgniteUrl(cfgPath)); + fileSys = secProvider.createFileSystem(); - try { - fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg); + uri = secProvider.uri().toString(); + + if (!uri.endsWith("/")) + uri += "/"; + + props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); + props.put(SECONDARY_FS_URI, uri); } - catch (IOException | URISyntaxException e) { + catch (IOException e) { throw new IgniteCheckedException(e); } - - uri = fileSys.getUri().toString(); - - if (!uri.endsWith("/")) - uri += "/"; - - props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); - props.put(SECONDARY_FS_URI, uri); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java index 1648bdc..c4d2f5e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java @@ -26,9 +26,9 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.hadoop.*; import org.apache.ignite.internal.igfs.common.*; import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -44,6 +44,7 @@ import static org.apache.ignite.configuration.IgfsConfiguration.*; import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*; import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; +import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; /** * {@code IGFS} Hadoop 1.x file system driver over file system API. To use @@ -57,7 +58,7 @@ import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; * * <property> * <name>fs.igfs.impl</name> - * <value>org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystem</value> + * <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value> * </property> * * You should also add Ignite JAR and all libraries to Hadoop classpath. To @@ -271,50 +272,29 @@ public class IgfsHadoopFileSystem extends FileSystem { boolean initSecondary = paths.defaultMode() == PROXY; - if (paths.pathModes() != null && !paths.pathModes().isEmpty()) { + if (!initSecondary && paths.pathModes() != null && !paths.pathModes().isEmpty()) { for (T2 pathMode : paths.pathModes()) { IgfsMode mode = pathMode.getValue(); - initSecondary |= mode == PROXY; + if (mode == PROXY) { + initSecondary = true; + + break; + } } } if (initSecondary) { Map props = paths.properties(); - String secUri = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_URI); - String secConfPath = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_CONFIG_PATH); - - if (secConfPath == null) - throw new IOException("Failed to connect to the secondary file system because configuration " + - "path is not provided."); - - if (secUri == null) - throw new IOException("Failed to connect to the secondary file system because URI is not " + - "provided."); + String secUri = props.get(SECONDARY_FS_URI); + String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); try { - secondaryUri = new URI(secUri); - - URL secondaryCfgUrl = U.resolveIgniteUrl(secConfPath); - - Configuration conf = new Configuration(); - - if (secondaryCfgUrl != null) - conf.addResource(secondaryCfgUrl); + SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); - String prop = String.format("fs.%s.impl.disable.cache", secondaryUri.getScheme()); - - conf.setBoolean(prop, true); - - secondaryFs = FileSystem.get(secondaryUri, conf); - } - catch (URISyntaxException ignore) { - if (!mgmt) - throw new IOException("Failed to resolve secondary file system URI: " + secUri); - else - LOG.warn("Visor failed to create secondary file system (operations on paths with PROXY mode " + - "will have no effect)."); + secondaryFs = secProvider.createFileSystem(); + secondaryUri = secProvider.uri(); } catch (IOException e) { if (!mgmt) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java index 5475cf4..0759203 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java @@ -26,9 +26,9 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.hadoop.*; import org.apache.ignite.internal.igfs.common.*; import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -44,6 +44,7 @@ import static org.apache.ignite.configuration.IgfsConfiguration.*; import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*; import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; +import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; /** * {@code IGFS} Hadoop 2.x file system driver over file system API. To use @@ -57,7 +58,7 @@ import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; * * <property> * <name>fs.igfs.impl</name> - * <value>org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystem</value> + * <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value> * </property> * * You should also add Ignite JAR and all libraries to Hadoop classpath. To @@ -266,56 +267,29 @@ public class IgfsHadoopFileSystem extends AbstractFileSystem implements Closeabl boolean initSecondary = paths.defaultMode() == PROXY; - if (paths.pathModes() != null) { + if (!initSecondary && paths.pathModes() != null) { for (T2 pathMode : paths.pathModes()) { IgfsMode mode = pathMode.getValue(); - initSecondary |= mode == PROXY; + if (mode == PROXY) { + initSecondary = true; + + break; + } } } if (initSecondary) { Map props = paths.properties(); - String secUri = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_URI); - String secConfPath = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_CONFIG_PATH); - - if (secConfPath == null) - throw new IOException("Failed to connect to the secondary file system because configuration " + - "path is not provided."); - - if (secUri == null) - throw new IOException("Failed to connect to the secondary file system because URI is not " + - "provided."); - - if (secConfPath == null) - throw new IOException("Failed to connect to the secondary file system because configuration " + - "path is not provided."); - - if (secUri == null) - throw new IOException("Failed to connect to the secondary file system because URI is not " + - "provided."); + String secUri = props.get(SECONDARY_FS_URI); + String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); try { - secondaryUri = new URI(secUri); - - URL secondaryCfgUrl = U.resolveIgniteUrl(secConfPath); + SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); - if (secondaryCfgUrl == null) - throw new IOException("Failed to resolve secondary file system config URL: " + secConfPath); - - Configuration conf = new Configuration(); - - conf.addResource(secondaryCfgUrl); - - String prop = String.format("fs.%s.impl.disable.cache", secondaryUri.getScheme()); - - conf.setBoolean(prop, true); - - secondaryFs = AbstractFileSystem.get(secondaryUri, conf); - } - catch (URISyntaxException ignore) { - throw new IOException("Failed to resolve secondary file system URI: " + secUri); + secondaryFs = secProvider.createAbstractFileSystem(); + secondaryUri = secProvider.uri(); } catch (IOException e) { throw new IOException("Failed to connect to the secondary file system: " + secUri, e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java new file mode 100644 index 0000000..c1dceba --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; + +/** + * Encapsulates logic of secondary filesystem creation. + */ +public class SecondaryFileSystemProvider { + /** Configuration of the secondary filesystem, never null. */ + private final Configuration cfg = new Configuration(); + + /** The secondary filesystem URI, never null. */ + private final URI uri; + + /** + * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be + * specified either explicitly or in the configuration provided. + * + * @param secUri the secondary Fs URI (optional). If not given explicitly, it must be specified as "fs.defaultFS" + * property in the provided configuration. + * @param secConfPath the secondary Fs path (file path on the local file system, optional). + * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved. + * @throws IOException + */ + public SecondaryFileSystemProvider(final @Nullable String secUri, + final @Nullable String secConfPath) throws IOException { + if (secConfPath != null) { + URL url = U.resolveIgniteUrl(secConfPath); + + if (url == null) { + // If secConfPath is given, it should be resolvable: + throw new IllegalArgumentException("Failed to resolve secondary file system " + + "configuration path: " + secConfPath); + } + + cfg.addResource(url); + } + + // if secondary fs URI is not given explicitly, try to get it from the configuration: + if (secUri == null) + uri = FileSystem.getDefaultUri(cfg); + else { + try { + uri = new URI(secUri); + } + catch (URISyntaxException use) { + throw new IOException("Failed to resolve secondary file system URI: " + secUri); + } + } + + if (uri == null) + throw new IllegalArgumentException("Failed to get secondary file system URI (it is neither given " + + "explicitly nor specified in the configuration): " + secConfPath); + + // Disable caching: + String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme()); + + cfg.setBoolean(prop, true); + } + + /** + * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. + * @throws IOException + */ + public FileSystem createFileSystem() throws IOException { + FileSystem fileSys = FileSystem.get(uri, cfg); + + return fileSys; + } + + /** + * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs. + * @throws IOException + */ + public AbstractFileSystem createAbstractFileSystem() throws IOException { + AbstractFileSystem secondaryFs = AbstractFileSystem.get(uri, cfg); + + return secondaryFs; + } + + /** + * @return the secondary fs URI, never null. + */ + public URI uri() { + return uri; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java new file mode 100644 index 0000000..4ad74d0 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -0,0 +1,541 @@ +package org.apache.ignite.igfs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.*; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.hadoop.*; +import org.apache.ignite.igfs.hadoop.v1.*; +import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.igfs.IgfsMode.*; +import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*; + +/** + * Tests secondary file system configuration. + */ +public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstractTest { + /** IGFS scheme */ + private static final String IGFS_SCHEME = "igfs"; + + /** Primary file system authority. */ + private static final String PRIMARY_AUTHORITY = "igfs:grid0@"; + + /** Autogenerated secondary file system configuration path. */ + private static final String PRIMARY_CFG_PATH = "/work/core-site-primary-test.xml"; + + /** Secondary file system authority. */ + private static final String SECONDARY_AUTHORITY = "igfs_secondary:grid_secondary@127.0.0.1:11500"; + + /** Autogenerated secondary file system configuration path. */ + private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; + + /** Secondary endpoint configuration. */ + protected static final Map SECONDARY_ENDPOINT_CFG = new HashMap() {{ + put("type", "tcp"); + put("port", "11500"); + }}; + + /** Group size. */ + public static final int GRP_SIZE = 128; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Primary file system URI. */ + protected URI primaryFsUri; + + /** Primary file system. */ + private FileSystem primaryFs; + + /** Full path of primary Fs configuration */ + private String primaryConfFullPath; + + /** Input primary Fs uri */ + private String primaryFsUriStr; + + /** Input URI scheme for configuration */ + private String primaryCfgScheme; + + /** Input URI authority for configuration */ + private String primaryCfgAuthority; + + /** if to pass configuration */ + private boolean passPrimaryConfiguration; + + /** Full path of s Fs configuration */ + private String secondaryConfFullPath; + + /** /Input URI scheme for configuration */ + private String secondaryFsUriStr; + + /** Input URI scheme for configuration */ + private String secondaryCfgScheme; + + /** Input URI authority for configuration */ + private String secondaryCfgAuthority; + + /** if to pass configuration */ + private boolean passSecondaryConfiguration; + + /** Default IGFS mode. */ + protected final IgfsMode mode; + + /** Skip embedded mode flag. */ + private final boolean skipEmbed; + + /** Skip local shmem flag. */ + private final boolean skipLocShmem; + + /** + * Constructor. + * + * @param mode Default IGFS mode. + * @param skipEmbed Whether to skip embedded mode. + * @param skipLocShmem Whether to skip local shmem mode. + */ + protected HadoopSecondaryFileSystemConfigurationTest(IgfsMode mode, boolean skipEmbed, boolean skipLocShmem) { + this.mode = mode; + this.skipEmbed = skipEmbed; + this.skipLocShmem = skipLocShmem; + } + + /** + * Default constructor. + */ + public HadoopSecondaryFileSystemConfigurationTest() { + this(PROXY, true, false); + } + + /** + * Executes before each test. + * @throws Exception + */ + private void before() throws Exception { + initSecondary(); + + if (passPrimaryConfiguration) { + Configuration primaryFsCfg = configuration(primaryCfgScheme, primaryCfgAuthority, skipEmbed, skipLocShmem); + + primaryConfFullPath = writeConfiguration(primaryFsCfg, PRIMARY_CFG_PATH); + } + else + primaryConfFullPath = null; + + SecondaryFileSystemProvider provider = + new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath); + + primaryFs = provider.createFileSystem(); + + primaryFsUri = provider.uri(); + } + + /** + * Executes after each test. + * @throws Exception + */ + private void after() throws Exception { + if (primaryFs != null) { + try { + primaryFs.delete(new Path("/"), true); + } + catch (Exception ignore) { + // No-op. + } + + U.closeQuiet(primaryFs); + } + + G.stopAll(true); + + delete(primaryConfFullPath); + delete(secondaryConfFullPath); + } + + /** + * Utility method to delete file. + * + * @param file the file path to delete. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + private static void delete(String file) { + if (file != null) { + new File(file).delete(); + + assertFalse(new File(file).exists()); + } + } + + /** + * Initialize underlying secondary filesystem. + * + * @throws Exception + */ + private void initSecondary() throws Exception { + if (passSecondaryConfiguration) { + Configuration secondaryConf = configuration(secondaryCfgScheme, secondaryCfgAuthority, true, true); + + secondaryConf.setInt("fs.igfs.block.size", 1024); + + secondaryConfFullPath = writeConfiguration(secondaryConf, SECONDARY_CFG_PATH); + } + else + secondaryConfFullPath = null; + + startNodes(); + } + + /** + * Starts the nodes for this test. + * + * @throws Exception If failed. + */ + private void startNodes() throws Exception { + if (mode != PRIMARY) + startSecondary(); + + startGrids(4); + } + + /** + * Starts secondary IGFS + */ + private void startSecondary() { + IgfsConfiguration igfsCfg = new IgfsConfiguration(); + + igfsCfg.setDataCacheName("partitioned"); + igfsCfg.setMetaCacheName("replicated"); + igfsCfg.setName("igfs_secondary"); + igfsCfg.setIpcEndpointConfiguration(SECONDARY_ENDPOINT_CFG); + igfsCfg.setBlockSize(512 * 1024); + igfsCfg.setPrefetchBlocks(1); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); + cacheCfg.setBackups(0); + cacheCfg.setQueryIndexEnabled(false); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("grid_secondary"); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); + cfg.setIgfsConfiguration(igfsCfg); + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + cfg.setCommunicationSpi(communicationSpi()); + + G.start(cfg); + } + + /** + * Get primary IPC endpoint configuration. + * + * @param gridName Grid name. + * @return IPC primary endpoint configuration. + */ + protected Map primaryIpcEndpointConfiguration(final String gridName) { + return new HashMap() {{ + put("type", "tcp"); + put("port", String.valueOf(DFLT_IPC_PORT + getTestGridIndex(gridName))); + }}; + } + + /** {@inheritDoc} */ + @Override public String getTestGridName() { + return "grid"; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(cacheConfiguration()); + cfg.setIgfsConfiguration(igfsConfiguration(gridName)); + cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + cfg.setCommunicationSpi(communicationSpi()); + + return cfg; + } + + /** + * Gets cache configuration. + * + * @return Cache configuration. + */ + protected CacheConfiguration[] cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE)); + cacheCfg.setBackups(0); + cacheCfg.setQueryIndexEnabled(false); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("replicated"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setQueryIndexEnabled(false); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + return new CacheConfiguration[] {metaCacheCfg, cacheCfg}; + } + + /** + * Gets IGFS configuration. + * + * @param gridName Grid name. + * @return IGFS configuration. + */ + protected IgfsConfiguration igfsConfiguration(String gridName) throws IgniteCheckedException { + IgfsConfiguration cfg = new IgfsConfiguration(); + + cfg.setDataCacheName("partitioned"); + cfg.setMetaCacheName("replicated"); + cfg.setName("igfs"); + cfg.setPrefetchBlocks(1); + cfg.setDefaultMode(mode); + + if (mode != PRIMARY) + cfg.setSecondaryFileSystem( + new IgfsHadoopFileSystemWrapper(secondaryFsUriStr, secondaryConfFullPath)); + + cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); + + cfg.setManagementPort(-1); + cfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. + + return cfg; + } + + /** @return Communication SPI. */ + private CommunicationSpi communicationSpi() { + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + return commSpi; + } + + /** + * Case #SecondaryFileSystemProvider(null, path) + * + * @throws Exception On failure. + */ + public void testFsConfigurationOnly() throws Exception { + primaryCfgScheme = IGFS_SCHEME; + primaryCfgAuthority = PRIMARY_AUTHORITY; + passPrimaryConfiguration = true; + primaryFsUriStr = null; + + // wrong secondary URI in the configuration: + secondaryCfgScheme = IGFS_SCHEME; + secondaryCfgAuthority = SECONDARY_AUTHORITY; + passSecondaryConfiguration = true; + secondaryFsUriStr = null; + + check(); + } + + /** + * Case #SecondaryFileSystemProvider(uri, path), when 'uri' parameter overrides + * the Fs uri set in the configuration. + * + * @throws Exception On failure. + */ + public void testFsUriOverridesUriInConfiguration() throws Exception { + // wrong primary URI in the configuration: + primaryCfgScheme = "foo"; + primaryCfgAuthority = "moo:zoo@bee"; + passPrimaryConfiguration = true; + primaryFsUriStr = mkUri(IGFS_SCHEME, PRIMARY_AUTHORITY); + + // wrong secondary URI in the configuration: + secondaryCfgScheme = "foo"; + secondaryCfgAuthority = "moo:zoo@bee"; + passSecondaryConfiguration = true; + secondaryFsUriStr = mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY); + + check(); + } + + /** + * Perform actual check. + * + * @throws Exception If failed. + */ + @SuppressWarnings("deprecation") + private void check() throws Exception { + before(); + + try { + Path fsHome = new Path(primaryFsUri); + Path dir = new Path(fsHome, "/someDir1/someDir2/someDir3"); + Path file = new Path(dir, "someFile"); + + assertPathDoesNotExist(primaryFs, file); + + FsPermission fsPerm = new FsPermission((short)644); + + FSDataOutputStream os = primaryFs.create(file, fsPerm, false, 1, (short)1, 1L, null); + + // Try to write something in file. + os.write("abc".getBytes()); + + os.close(); + + // Check file status. + FileStatus fileStatus = primaryFs.getFileStatus(file); + + assertFalse(fileStatus.isDir()); + assertEquals(file, fileStatus.getPath()); + assertEquals(fsPerm, fileStatus.getPermission()); + } + finally { + after(); + } + } + + /** + * Create configuration for test. + * + * @param skipEmbed Whether to skip embedded mode. + * @param skipLocShmem Whether to skip local shmem mode. + * @return Configuration. + */ + private static Configuration configuration(String scheme, String authority, boolean skipEmbed, boolean skipLocShmem) { + final Configuration cfg = new Configuration(); + + if (scheme != null && authority != null) + cfg.set("fs.defaultFS", scheme + "://" + authority + "/"); + + setImplClasses(cfg); + + if (authority != null) { + if (skipEmbed) + cfg.setBoolean(String.format(IgfsHadoopUtils.PARAM_IGFS_ENDPOINT_NO_EMBED, authority), true); + + if (skipLocShmem) + cfg.setBoolean(String.format(IgfsHadoopUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority), true); + } + + return cfg; + } + + /** + * Sets Hadoop Fs implementation classes. + * + * @param cfg the configuration to set parameters into. + */ + private static void setImplClasses(Configuration cfg) { + cfg.set("fs.igfs.impl", IgfsHadoopFileSystem.class.getName()); + + cfg.set("fs.AbstractFileSystem.igfs.impl", + org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem.class.getName()); + } + + /** + * Check path does not exist in a given FileSystem. + * + * @param fs FileSystem to check. + * @param path Path to check. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + private void assertPathDoesNotExist(final FileSystem fs, final Path path) { + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return fs.getFileStatus(path); + } + }, FileNotFoundException.class, null); + } + + /** + * Writes down the configuration to local disk and returns its path. + * + * @param cfg the configuration to write. + * @param pathFromIgniteHome path relatively to Ignite home. + * @return Full path of the written configuration. + */ + private static String writeConfiguration(Configuration cfg, String pathFromIgniteHome) throws IOException { + if (!pathFromIgniteHome.startsWith("/")) + pathFromIgniteHome = "/" + pathFromIgniteHome; + + final String path = U.getIgniteHome() + pathFromIgniteHome; + + delete(path); + + File file = new File(path); + + try (FileOutputStream fos = new FileOutputStream(file)) { + cfg.writeXml(fos); + } + + assertTrue(file.exists()); + return path; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60 * 1000; + } + + /** + * Makes URI. + * + * @param scheme the scheme + * @param authority the authority + * @return URI String + */ + private static String mkUri(String scheme, String authority) { + return scheme + "://" + authority + "/"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index a03eb81..836cdaa 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -120,6 +120,8 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(GridHadoopCommandLineTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSecondaryFileSystemConfigurationTest.class.getName()))); + return suite; }