ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [3/7] incubator-ignite git commit: # IGNITE-349: Applied patch from Ivan V..
Date Wed, 04 Mar 2015 14:57:59 GMT
# IGNITE-349: Applied patch from Ivan V..


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/03b966fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/03b966fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/03b966fe

Branch: refs/heads/ignite-386
Commit: 03b966fe01c341a9ff6dcfcbaddaaa8e7f64ec84
Parents: 5d2a7c6
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Wed Mar 4 17:50:53 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed Mar 4 17:50:53 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/processors/igfs/IgfsEx.java |   6 +
 .../visor/node/VisorIgfsConfiguration.java      |   7 +-
 .../hadoop/IgfsHadoopFileSystemWrapper.java     |  44 +-
 .../igfs/hadoop/v1/IgfsHadoopFileSystem.java    |  48 +-
 .../igfs/hadoop/v2/IgfsHadoopFileSystem.java    |  54 +-
 .../hadoop/SecondaryFileSystemProvider.java     | 111 ++++
 ...oopSecondaryFileSystemConfigurationTest.java | 541 +++++++++++++++++++
 .../testsuites/IgniteHadoopTestSuite.java       |   2 +
 8 files changed, 712 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index e067e78..a380a6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -29,6 +29,12 @@ import java.net.*;
  * Internal API extension for {@link org.apache.ignite.IgniteFs}.
  */
 public interface IgfsEx extends IgniteFs {
+    /** Property name for path to Hadoop configuration. */
+    String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
+
+    /** Property name for URI of file system. */
+    String SECONDARY_FS_URI = "SECONDARY_FS_URI";
+
     /**
      * Stops IGFS cleaning all used resources.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
index 056ac7f..78943a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java
@@ -21,6 +21,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
+import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
 
 import java.io.*;
 import java.util.*;
@@ -31,12 +32,6 @@ import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*;
  * Data transfer object for IGFS configuration properties.
  */
 public class VisorIgfsConfiguration implements Serializable {
-    /** Property name for path to Hadoop configuration. */
-    public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
-
-    /** Property name for URI of file system. */
-    public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
-
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
index 29dfde5..bdab61d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.igfs.hadoop;
 
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
@@ -25,10 +24,11 @@ import org.apache.hadoop.ipc.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
+import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
 
 import java.io.*;
 import java.net.*;
@@ -38,11 +38,6 @@ import java.util.*;
  * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link
org.apache.ignite.igfs.Igfs}.
  */
 public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable {
-    /** Property name for path to Hadoop configuration. */
-    public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
-
-    /** Property name for URI of file system. */
-    public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
 
     /** Hadoop file system. */
     private final FileSystem fileSys;
@@ -51,6 +46,16 @@ public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable
{
     private final Map<String, String> props = new HashMap<>();
 
     /**
+     * Simple constructor that is to be used by default.
+     *
+     * @param uri URI of file system.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public IgfsHadoopFileSystemWrapper(String uri) throws IgniteCheckedException {
+        this(uri, null);
+    }
+
+    /**
      * Constructor.
      *
      * @param uri URI of file system.
@@ -58,25 +63,22 @@ public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable
{
      * @throws IgniteCheckedException In case of error.
      */
     public IgfsHadoopFileSystemWrapper(@Nullable String uri, @Nullable String cfgPath) throws
IgniteCheckedException {
-        Configuration cfg = new Configuration();
+        try {
+            SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri,
cfgPath);
 
-        if (cfgPath != null)
-            cfg.addResource(U.resolveIgniteUrl(cfgPath));
+            fileSys = secProvider.createFileSystem();
 
-        try {
-            fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg);
+            uri = secProvider.uri().toString();
+
+            if (!uri.endsWith("/"))
+                uri += "/";
+
+            props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+            props.put(SECONDARY_FS_URI, uri);
         }
-        catch (IOException | URISyntaxException e) {
+        catch (IOException e) {
             throw new IgniteCheckedException(e);
         }
-
-        uri = fileSys.getUri().toString();
-
-        if (!uri.endsWith("/"))
-            uri += "/";
-
-        props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
-        props.put(SECONDARY_FS_URI, uri);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
index 1648bdc..c4d2f5e 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
@@ -26,9 +26,9 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
-import org.apache.ignite.igfs.hadoop.*;
 import org.apache.ignite.internal.igfs.common.*;
 import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -44,6 +44,7 @@ import static org.apache.ignite.configuration.IgfsConfiguration.*;
 import static org.apache.ignite.igfs.IgfsMode.*;
 import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*;
 import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
+import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
 
 /**
  * {@code IGFS} Hadoop 1.x file system driver over file system API. To use
@@ -57,7 +58,7 @@ import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
  *
  *  &lt;property&gt;
  *      &lt;name&gt;fs.igfs.impl&lt;/name&gt;
- *      &lt;value&gt;org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystem&lt;/value&gt;
+ *      &lt;value&gt;org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem&lt;/value&gt;
  *  &lt;/property&gt;
  * </pre>
  * You should also add Ignite JAR and all libraries to Hadoop classpath. To
@@ -271,50 +272,29 @@ public class IgfsHadoopFileSystem extends FileSystem {
 
             boolean initSecondary = paths.defaultMode() == PROXY;
 
-            if (paths.pathModes() != null && !paths.pathModes().isEmpty()) {
+            if (!initSecondary && paths.pathModes() != null && !paths.pathModes().isEmpty())
{
                 for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
                     IgfsMode mode = pathMode.getValue();
 
-                    initSecondary |= mode == PROXY;
+                    if (mode == PROXY) {
+                        initSecondary = true;
+
+                        break;
+                    }
                 }
             }
 
             if (initSecondary) {
                 Map<String, String> props = paths.properties();
 
-                String secUri = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_URI);
-                String secConfPath = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_CONFIG_PATH);
-
-                if (secConfPath == null)
-                    throw new IOException("Failed to connect to the secondary file system
because configuration " +
-                        "path is not provided.");
-
-                if (secUri == null)
-                    throw new IOException("Failed to connect to the secondary file system
because URI is not " +
-                        "provided.");
+                String secUri = props.get(SECONDARY_FS_URI);
+                String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
 
                 try {
-                    secondaryUri = new URI(secUri);
-
-                    URL secondaryCfgUrl = U.resolveIgniteUrl(secConfPath);
-
-                    Configuration conf = new Configuration();
-
-                    if (secondaryCfgUrl != null)
-                        conf.addResource(secondaryCfgUrl);
+                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri,
secConfPath);
 
-                    String prop = String.format("fs.%s.impl.disable.cache", secondaryUri.getScheme());
-
-                    conf.setBoolean(prop, true);
-
-                    secondaryFs = FileSystem.get(secondaryUri, conf);
-                }
-                catch (URISyntaxException ignore) {
-                    if (!mgmt)
-                        throw new IOException("Failed to resolve secondary file system URI:
" + secUri);
-                    else
-                        LOG.warn("Visor failed to create secondary file system (operations
on paths with PROXY mode " +
-                            "will have no effect).");
+                    secondaryFs = secProvider.createFileSystem();
+                    secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
                     if (!mgmt)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
index 5475cf4..0759203 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
@@ -26,9 +26,9 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
-import org.apache.ignite.igfs.hadoop.*;
 import org.apache.ignite.internal.igfs.common.*;
 import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -44,6 +44,7 @@ import static org.apache.ignite.configuration.IgfsConfiguration.*;
 import static org.apache.ignite.igfs.IgfsMode.*;
 import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*;
 import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
+import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
 
 /**
  * {@code IGFS} Hadoop 2.x file system driver over file system API. To use
@@ -57,7 +58,7 @@ import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
  *
  *  &lt;property&gt;
  *      &lt;name&gt;fs.igfs.impl&lt;/name&gt;
- *      &lt;value&gt;org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystem&lt;/value&gt;
+ *      &lt;value&gt;org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem&lt;/value&gt;
  *  &lt;/property&gt;
  * </pre>
  * You should also add Ignite JAR and all libraries to Hadoop classpath. To
@@ -266,56 +267,29 @@ public class IgfsHadoopFileSystem extends AbstractFileSystem implements
Closeabl
 
             boolean initSecondary = paths.defaultMode() == PROXY;
 
-            if (paths.pathModes() != null) {
+            if (!initSecondary && paths.pathModes() != null) {
                 for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
                     IgfsMode mode = pathMode.getValue();
 
-                    initSecondary |= mode == PROXY;
+                    if (mode == PROXY) {
+                        initSecondary = true;
+
+                        break;
+                    }
                 }
             }
 
             if (initSecondary) {
                 Map<String, String> props = paths.properties();
 
-                String secUri = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_URI);
-                String secConfPath = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_CONFIG_PATH);
-
-                if (secConfPath == null)
-                    throw new IOException("Failed to connect to the secondary file system
because configuration " +
-                            "path is not provided.");
-
-                if (secUri == null)
-                    throw new IOException("Failed to connect to the secondary file system
because URI is not " +
-                            "provided.");
-
-                if (secConfPath == null)
-                    throw new IOException("Failed to connect to the secondary file system
because configuration " +
-                        "path is not provided.");
-
-                if (secUri == null)
-                    throw new IOException("Failed to connect to the secondary file system
because URI is not " +
-                        "provided.");
+                String secUri = props.get(SECONDARY_FS_URI);
+                String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
 
                 try {
-                    secondaryUri = new URI(secUri);
-
-                    URL secondaryCfgUrl = U.resolveIgniteUrl(secConfPath);
+                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri,
secConfPath);
 
-                    if (secondaryCfgUrl == null)
-                        throw new IOException("Failed to resolve secondary file system config
URL: " + secConfPath);
-
-                    Configuration conf = new Configuration();
-
-                    conf.addResource(secondaryCfgUrl);
-
-                    String prop = String.format("fs.%s.impl.disable.cache", secondaryUri.getScheme());
-
-                    conf.setBoolean(prop, true);
-
-                    secondaryFs = AbstractFileSystem.get(secondaryUri, conf);
-                }
-                catch (URISyntaxException ignore) {
-                    throw new IOException("Failed to resolve secondary file system URI: "
+ secUri);
+                    secondaryFs = secProvider.createAbstractFileSystem();
+                    secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
                     throw new IOException("Failed to connect to the secondary file system:
" + secUri, e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
new file mode 100644
index 0000000..c1dceba
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Encapsulates logic of secondary filesystem creation.
+ */
+public class SecondaryFileSystemProvider {
+    /** Configuration of the secondary filesystem, never null. */
+    private final Configuration cfg = new Configuration();
+
+    /** The secondary filesystem URI, never null. */
+    private final URI uri;
+
+    /**
+     * Creates new provider with given config parameters. The configuration URL is optional.
The filesystem URI must be
+     * specified either explicitly or in the configuration provided.
+     *
+     * @param secUri the secondary Fs URI (optional). If not given explicitly, it must be
specified as "fs.defaultFS"
+     * property in the provided configuration.
+     * @param secConfPath the secondary Fs path (file path on the local file system, optional).
+     * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved.
+     * @throws IOException
+     */
+    public SecondaryFileSystemProvider(final @Nullable String secUri,
+        final @Nullable String secConfPath) throws IOException {
+        if (secConfPath != null) {
+            URL url = U.resolveIgniteUrl(secConfPath);
+
+            if (url == null) {
+                // If secConfPath is given, it should be resolvable:
+                throw new IllegalArgumentException("Failed to resolve secondary file system
" +
+                    "configuration path: " + secConfPath);
+            }
+
+            cfg.addResource(url);
+        }
+
+        // if secondary fs URI is not given explicitly, try to get it from the configuration:
+        if (secUri == null)
+            uri = FileSystem.getDefaultUri(cfg);
+        else {
+            try {
+                uri = new URI(secUri);
+            }
+            catch (URISyntaxException use) {
+                throw new IOException("Failed to resolve secondary file system URI: " + secUri);
+            }
+        }
+
+        if (uri == null)
+            throw new IllegalArgumentException("Failed to get secondary file system URI (it
is neither given " +
+                "explicitly nor specified in the configuration): " + secConfPath);
+
+        // Disable caching:
+        String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme());
+
+        cfg.setBoolean(prop, true);
+    }
+
+    /**
+     * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this secondary Fs.
+     * @throws IOException
+     */
+    public FileSystem createFileSystem() throws IOException {
+        FileSystem fileSys = FileSystem.get(uri, cfg);
+
+        return fileSys;
+    }
+
+    /**
+     * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary
Fs.
+     * @throws IOException
+     */
+    public AbstractFileSystem createAbstractFileSystem() throws IOException {
+        AbstractFileSystem secondaryFs = AbstractFileSystem.get(uri, cfg);
+
+        return secondaryFs;
+    }
+
+    /**
+     * @return the secondary fs URI, never null.
+     */
+    public URI uri() {
+        return uri;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
new file mode 100644
index 0000000..4ad74d0
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -0,0 +1,541 @@
+package org.apache.ignite.igfs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.hadoop.*;
+import org.apache.ignite.igfs.hadoop.v1.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
+import static org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint.*;
+
+/**
+ * Tests secondary file system configuration.
+ */
+public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstractTest {
+    /** IGFS scheme */
+    private static final String IGFS_SCHEME = "igfs";
+
+    /** Primary file system authority. */
+    private static final String PRIMARY_AUTHORITY = "igfs:grid0@";
+
+    /** Autogenerated secondary file system configuration path. */
+    private static final String PRIMARY_CFG_PATH = "/work/core-site-primary-test.xml";
+
+    /** Secondary file system authority. */
+    private static final String SECONDARY_AUTHORITY = "igfs_secondary:grid_secondary@127.0.0.1:11500";
+
+    /** Autogenerated secondary file system configuration path. */
+    private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml";
+
+    /** Secondary endpoint configuration. */
+    protected static final Map<String, String> SECONDARY_ENDPOINT_CFG = new HashMap<String,
String>() {{
+        put("type", "tcp");
+        put("port", "11500");
+    }};
+
+    /** Group size. */
+    public static final int GRP_SIZE = 128;
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Primary file system URI. */
+    protected URI primaryFsUri;
+
+    /** Primary file system. */
+    private FileSystem primaryFs;
+
+    /** Full path of primary Fs configuration */
+    private String primaryConfFullPath;
+
+    /** Input primary Fs uri */
+    private String primaryFsUriStr;
+
+    /** Input URI scheme for configuration */
+    private String primaryCfgScheme;
+
+    /** Input URI authority for configuration */
+    private String primaryCfgAuthority;
+
+    /** if to pass configuration */
+    private boolean passPrimaryConfiguration;
+
+    /** Full path of s Fs configuration */
+    private String secondaryConfFullPath;
+
+    /** /Input URI scheme for configuration */
+    private String secondaryFsUriStr;
+
+    /** Input URI scheme for configuration */
+    private String secondaryCfgScheme;
+
+    /** Input URI authority for configuration */
+    private String secondaryCfgAuthority;
+
+    /** if to pass configuration */
+    private boolean passSecondaryConfiguration;
+
+    /** Default IGFS mode. */
+    protected final IgfsMode mode;
+
+    /** Skip embedded mode flag. */
+    private final boolean skipEmbed;
+
+    /** Skip local shmem flag. */
+    private final boolean skipLocShmem;
+
+    /**
+     * Constructor.
+     *
+     * @param mode Default IGFS mode.
+     * @param skipEmbed Whether to skip embedded mode.
+     * @param skipLocShmem Whether to skip local shmem mode.
+     */
+    protected HadoopSecondaryFileSystemConfigurationTest(IgfsMode mode, boolean skipEmbed,
boolean skipLocShmem) {
+        this.mode = mode;
+        this.skipEmbed = skipEmbed;
+        this.skipLocShmem = skipLocShmem;
+    }
+
+    /**
+     * Default constructor.
+     */
+    public HadoopSecondaryFileSystemConfigurationTest() {
+        this(PROXY, true, false);
+    }
+
+    /**
+     * Executes before each test.
+     * @throws Exception
+     */
+    private void before() throws Exception {
+        initSecondary();
+
+        if (passPrimaryConfiguration) {
+            Configuration primaryFsCfg = configuration(primaryCfgScheme, primaryCfgAuthority,
skipEmbed, skipLocShmem);
+
+            primaryConfFullPath = writeConfiguration(primaryFsCfg, PRIMARY_CFG_PATH);
+        }
+        else
+            primaryConfFullPath = null;
+
+        SecondaryFileSystemProvider provider =
+            new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath);
+
+        primaryFs = provider.createFileSystem();
+
+        primaryFsUri = provider.uri();
+    }
+
+    /**
+     * Executes after each test.
+     * @throws Exception
+     */
+    private void after() throws Exception {
+        if (primaryFs != null) {
+            try {
+                primaryFs.delete(new Path("/"), true);
+            }
+            catch (Exception ignore) {
+                // No-op.
+            }
+
+            U.closeQuiet(primaryFs);
+        }
+
+        G.stopAll(true);
+
+        delete(primaryConfFullPath);
+        delete(secondaryConfFullPath);
+    }
+
+    /**
+     * Utility method to delete file.
+     *
+     * @param file the file path to delete.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    private static void delete(String file) {
+        if (file != null) {
+            new File(file).delete();
+
+            assertFalse(new File(file).exists());
+        }
+    }
+
+    /**
+     * Initialize underlying secondary filesystem.
+     *
+     * @throws Exception
+     */
+    private void initSecondary() throws Exception {
+        if (passSecondaryConfiguration) {
+            Configuration secondaryConf = configuration(secondaryCfgScheme, secondaryCfgAuthority,
true, true);
+
+            secondaryConf.setInt("fs.igfs.block.size", 1024);
+
+            secondaryConfFullPath = writeConfiguration(secondaryConf, SECONDARY_CFG_PATH);
+        }
+        else
+            secondaryConfFullPath = null;
+
+        startNodes();
+    }
+
+    /**
+     * Starts the nodes for this test.
+     *
+     * @throws Exception If failed.
+     */
+    private void startNodes() throws Exception {
+        if (mode != PRIMARY)
+            startSecondary();
+
+        startGrids(4);
+    }
+
+    /**
+     * Starts secondary IGFS
+     */
+    private void startSecondary() {
+        IgfsConfiguration igfsCfg = new IgfsConfiguration();
+
+        igfsCfg.setDataCacheName("partitioned");
+        igfsCfg.setMetaCacheName("replicated");
+        igfsCfg.setName("igfs_secondary");
+        igfsCfg.setIpcEndpointConfiguration(SECONDARY_ENDPOINT_CFG);
+        igfsCfg.setBlockSize(512 * 1024);
+        igfsCfg.setPrefetchBlocks(1);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setName("partitioned");
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
+        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE));
+        cacheCfg.setBackups(0);
+        cacheCfg.setQueryIndexEnabled(false);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("replicated");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setQueryIndexEnabled(false);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName("grid_secondary");
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(metaCacheCfg, cacheCfg);
+        cfg.setIgfsConfiguration(igfsCfg);
+        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+
+        cfg.setCommunicationSpi(communicationSpi());
+
+        G.start(cfg);
+    }
+
+    /**
+     * Get primary IPC endpoint configuration.
+     *
+     * @param gridName Grid name.
+     * @return IPC primary endpoint configuration.
+     */
+    protected Map<String, String> primaryIpcEndpointConfiguration(final String gridName)
{
+        return new HashMap<String, String>() {{
+            put("type", "tcp");
+            put("port", String.valueOf(DFLT_IPC_PORT + getTestGridIndex(gridName)));
+        }};
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getTestGridName() {
+        return "grid";
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(cacheConfiguration());
+        cfg.setIgfsConfiguration(igfsConfiguration(gridName));
+        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+        cfg.setCommunicationSpi(communicationSpi());
+
+        return cfg;
+    }
+
+    /**
+     * Gets cache configuration.
+     *
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration[] cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setName("partitioned");
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
+        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(GRP_SIZE));
+        cacheCfg.setBackups(0);
+        cacheCfg.setQueryIndexEnabled(false);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("replicated");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setQueryIndexEnabled(false);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        return new CacheConfiguration[] {metaCacheCfg, cacheCfg};
+    }
+
+    /**
+     * Gets IGFS configuration.
+     *
+     * @param gridName Grid name.
+     * @return IGFS configuration.
+     */
+    protected IgfsConfiguration igfsConfiguration(String gridName) throws IgniteCheckedException
{
+        IgfsConfiguration cfg = new IgfsConfiguration();
+
+        cfg.setDataCacheName("partitioned");
+        cfg.setMetaCacheName("replicated");
+        cfg.setName("igfs");
+        cfg.setPrefetchBlocks(1);
+        cfg.setDefaultMode(mode);
+
+        if (mode != PRIMARY)
+            cfg.setSecondaryFileSystem(
+                new IgfsHadoopFileSystemWrapper(secondaryFsUriStr, secondaryConfFullPath));
+
+        cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
+
+        cfg.setManagementPort(-1);
+        cfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M
per node groups.
+
+        return cfg;
+    }
+
+    /** @return Communication SPI. */
+    private CommunicationSpi communicationSpi() {
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        return commSpi;
+    }
+
+    /**
+     * Case #SecondaryFileSystemProvider(null, path)
+     *
+     * @throws Exception On failure.
+     */
+    public void testFsConfigurationOnly() throws Exception {
+        primaryCfgScheme = IGFS_SCHEME;
+        primaryCfgAuthority = PRIMARY_AUTHORITY;
+        passPrimaryConfiguration = true;
+        primaryFsUriStr = null;
+
+        // wrong secondary URI in the configuration:
+        secondaryCfgScheme = IGFS_SCHEME;
+        secondaryCfgAuthority = SECONDARY_AUTHORITY;
+        passSecondaryConfiguration = true;
+        secondaryFsUriStr = null;
+
+        check();
+    }
+
+    /**
+     * Case #SecondaryFileSystemProvider(uri, path), when 'uri' parameter overrides
+     * the Fs uri set in the configuration.
+     *
+     * @throws Exception On failure.
+     */
+    public void testFsUriOverridesUriInConfiguration() throws Exception {
+        // wrong primary URI in the configuration:
+        primaryCfgScheme = "foo";
+        primaryCfgAuthority = "moo:zoo@bee";
+        passPrimaryConfiguration = true;
+        primaryFsUriStr = mkUri(IGFS_SCHEME, PRIMARY_AUTHORITY);
+
+        // wrong secondary URI in the configuration:
+        secondaryCfgScheme = "foo";
+        secondaryCfgAuthority = "moo:zoo@bee";
+        passSecondaryConfiguration = true;
+        secondaryFsUriStr = mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY);
+
+        check();
+    }
+
+    /**
+     * Perform actual check.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("deprecation")
+    private void check() throws Exception {
+        before();
+
+        try {
+            Path fsHome = new Path(primaryFsUri);
+            Path dir = new Path(fsHome, "/someDir1/someDir2/someDir3");
+            Path file = new Path(dir, "someFile");
+
+            assertPathDoesNotExist(primaryFs, file);
+
+            FsPermission fsPerm = new FsPermission((short)644);
+
+            FSDataOutputStream os = primaryFs.create(file, fsPerm, false, 1, (short)1, 1L,
null);
+
+            // Try to write something in file.
+            os.write("abc".getBytes());
+
+            os.close();
+
+            // Check file status.
+            FileStatus fileStatus = primaryFs.getFileStatus(file);
+
+            assertFalse(fileStatus.isDir());
+            assertEquals(file, fileStatus.getPath());
+            assertEquals(fsPerm, fileStatus.getPermission());
+        }
+        finally {
+            after();
+        }
+    }
+
+    /**
+     * Create configuration for test.
+     *
+     * @param skipEmbed Whether to skip embedded mode.
+     * @param skipLocShmem Whether to skip local shmem mode.
+     * @return Configuration.
+     */
+    private static Configuration configuration(String scheme, String authority, boolean skipEmbed,
boolean skipLocShmem) {
+        final Configuration cfg = new Configuration();
+
+        if (scheme != null && authority != null)
+            cfg.set("fs.defaultFS", scheme + "://" + authority + "/");
+
+        setImplClasses(cfg);
+
+        if (authority != null) {
+            if (skipEmbed)
+                cfg.setBoolean(String.format(IgfsHadoopUtils.PARAM_IGFS_ENDPOINT_NO_EMBED,
authority), true);
+
+            if (skipLocShmem)
+                cfg.setBoolean(String.format(IgfsHadoopUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM,
authority), true);
+        }
+
+        return cfg;
+    }
+
+    /**
+     * Sets Hadoop Fs implementation classes.
+     *
+     * @param cfg the configuration to set parameters into.
+     */
+    private static void setImplClasses(Configuration cfg) {
+        cfg.set("fs.igfs.impl", IgfsHadoopFileSystem.class.getName());
+
+        cfg.set("fs.AbstractFileSystem.igfs.impl",
+            org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem.class.getName());
+    }
+
+    /**
+     * Check path does not exist in a given FileSystem.
+     *
+     * @param fs FileSystem to check.
+     * @param path Path to check.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    private void assertPathDoesNotExist(final FileSystem fs, final Path path) {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fs.getFileStatus(path);
+            }
+        }, FileNotFoundException.class, null);
+    }
+
+    /**
+     * Writes down the configuration to local disk and returns its path.
+     *
+     * @param cfg the configuration to write.
+     * @param pathFromIgniteHome path relatively to Ignite home.
+     * @return Full path of the written configuration.
+     */
+    private static String writeConfiguration(Configuration cfg, String pathFromIgniteHome)
throws IOException {
+        if (!pathFromIgniteHome.startsWith("/"))
+            pathFromIgniteHome = "/" + pathFromIgniteHome;
+
+        final String path = U.getIgniteHome() + pathFromIgniteHome;
+
+        delete(path);
+
+        File file = new File(path);
+
+        try (FileOutputStream fos = new FileOutputStream(file)) {
+            cfg.writeXml(fos);
+        }
+
+        assertTrue(file.exists());
+        return path;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 3 * 60 * 1000;
+    }
+
+    /**
+     * Makes URI.
+     *
+     * @param scheme the scheme
+     * @param authority the authority
+     * @return URI String
+     */
+    private static String mkUri(String scheme, String authority) {
+        return scheme + "://" + authority + "/";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03b966fe/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index a03eb81..836cdaa 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -120,6 +120,8 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(ldr.loadClass(GridHadoopCommandLineTest.class.getName())));
 
+        suite.addTest(new TestSuite(ldr.loadClass(HadoopSecondaryFileSystemConfigurationTest.class.getName())));
+
         return suite;
     }
 


Mime
View raw message