ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [44/50] [abbrv] incubator-ignite git commit: # IGNITE-348: Applied patch from Ivan V..
Date Fri, 27 Feb 2015 14:59:19 GMT
# IGNITE-348: Applied patch from Ivan V..


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/23bee413
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/23bee413
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/23bee413

Branch: refs/heads/ignite-290
Commit: 23bee413c6e4f382036ab63433e1a38927a8347f
Parents: 4e7463d
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Fri Feb 27 13:30:53 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Fri Feb 27 13:30:53 2015 +0300

----------------------------------------------------------------------
 config/hadoop/default-config.xml                |  12 +
 .../hadoop/IgfsHadoopFileSystemWrapper.java     | 412 ++++++++++++++++++
 .../igfs/hadoop/v1/IgfsHadoopFileSystem.java    |   1 +
 .../igfs/hadoop/v2/IgfsHadoopFileSystem.java    |   1 +
 .../igfs/hadoop/IgfsHadoopFSProperties.java     |  10 +-
 .../hadoop/IgfsHadoopFileSystemWrapper.java     | 413 -------------------
 .../internal/igfs/hadoop/IgfsHadoopReader.java  |   2 +-
 .../apache/ignite/igfs/IgfsEventsTestSuite.java |   2 +-
 .../IgfsHadoop20FileSystemAbstractSelfTest.java |   2 +-
 .../igfs/IgfsHadoopDualAbstractSelfTest.java    |   2 +-
 .../IgfsHadoopFileSystemAbstractSelfTest.java   |   1 +
 ...fsHadoopFileSystemSecondaryModeSelfTest.java |   2 +-
 12 files changed, 437 insertions(+), 423 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/config/hadoop/default-config.xml
----------------------------------------------------------------------
diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml
index 5fafad8..a264749 100644
--- a/config/hadoop/default-config.xml
+++ b/config/hadoop/default-config.xml
@@ -129,6 +129,18 @@
                             <entry key="port" value="10500"/>
                         </map>
                     </property>
+
+                    <!-- Example secondary file system configuration (IGFS configured
over Hadoop HDFS): -->
+                    <!--
+                    <property name="defaultMode" value="PROXY"/>
+
+                    <property name="secondaryFileSystem">
+                        <bean class="org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystemWrapper">
+                            <constructor-arg name="uri"     value="hdfs://1.2.3.4:9000"/>
+                            <constructor-arg name="cfgPath" value="/opt/hadoop-server/etc/hadoop/core-site.xml"/>
+                        </bean>
+                    </property>
+                    -->
                 </bean>
             </list>
         </property>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
new file mode 100644
index 0000000..29dfde5
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.ipc.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link
org.apache.ignite.igfs.Igfs}.
+ */
+public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable {
+    /** Property name for path to Hadoop configuration. */
+    public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
+
+    /** Property name for URI of file system. */
+    public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
+
+    /** Hadoop file system. */
+    private final FileSystem fileSys;
+
+    /** Properties of file system */
+    private final Map<String, String> props = new HashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param uri URI of file system.
+     * @param cfgPath Additional path to Hadoop configuration.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public IgfsHadoopFileSystemWrapper(@Nullable String uri, @Nullable String cfgPath) throws
IgniteCheckedException {
+        Configuration cfg = new Configuration();
+
+        if (cfgPath != null)
+            cfg.addResource(U.resolveIgniteUrl(cfgPath));
+
+        try {
+            fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg);
+        }
+        catch (IOException | URISyntaxException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        uri = fileSys.getUri().toString();
+
+        if (!uri.endsWith("/"))
+            uri += "/";
+
+        props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+        props.put(SECONDARY_FS_URI, uri);
+    }
+
+    /**
+     * Convert IGFS path into Hadoop path.
+     *
+     * @param path IGFS path.
+     * @return Hadoop path.
+     */
+    private Path convert(IgfsPath path) {
+        URI uri = fileSys.getUri();
+
+        return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
+    }
+
+    /**
+     * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate
exception.
+     *
+     * @param e Exception to check.
+     * @param detailMsg Detailed error message.
+     * @return Appropriate exception.
+     */
+    private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
+        boolean wrongVer = X.hasCause(e, RemoteException.class) ||
+            (e.getMessage() != null && e.getMessage().contains("Failed on local"));
+
+        IgfsException igfsErr = !wrongVer ? cast(detailMsg, e) :
+            new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs
from local " +
+                "version.", e);
+
+        return igfsErr;
+    }
+
+    /**
+     * Cast IO exception to IGFS exception.
+     *
+     * @param e IO exception.
+     * @return IGFS exception.
+     */
+    public static IgfsException cast(String msg, IOException e) {
+        if (e instanceof FileNotFoundException)
+            return new IgfsFileNotFoundException(e);
+        else if (e instanceof ParentNotDirectoryException)
+            return new IgfsParentNotDirectoryException(msg, e);
+        else if (e instanceof PathIsNotEmptyDirectoryException)
+            return new IgfsDirectoryNotEmptyException(e);
+        else if (e instanceof PathExistsException)
+            return new IgfsPathAlreadyExistsException(msg, e);
+        else
+            return new IgfsException(msg, e);
+    }
+
+    /**
+     * Convert Hadoop FileStatus properties to map.
+     *
+     * @param status File status.
+     * @return IGFS attributes.
+     */
+    private static Map<String, String> properties(FileStatus status) {
+        FsPermission perm = status.getPermission();
+
+        if (perm == null)
+            perm = FsPermission.getDefault();
+
+        return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME,
status.getOwner(),
+            PROP_GROUP_NAME, status.getGroup());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exists(IgfsPath path) {
+        try {
+            return fileSys.exists(convert(path));
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path
+ "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props)
{
+        IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props);
+
+        try {
+            if (props0.userName() != null || props0.groupName() != null)
+                fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
+
+            if (props0.permission() != null)
+                fileSys.setPermission(convert(path), props0.permission());
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path
+ "]");
+        }
+
+        //Result is not used in case of secondary FS.
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rename(IgfsPath src, IgfsPath dest) {
+        // Delegate to the secondary file system.
+        try {
+            if (!fileSys.rename(convert(src), convert(dest)))
+                throw new IgfsException("Failed to rename (secondary file system returned
false) " +
+                    "[src=" + src + ", dest=" + dest + ']');
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest="
+ dest + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(IgfsPath path, boolean recursive) {
+        try {
+            return fileSys.delete(convert(path), recursive);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive="
+ recursive + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path) {
+        try {
+            if (!fileSys.mkdirs(convert(path)))
+                throw new IgniteException("Failed to make directories [path=" + path + "]");
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path +
"]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props)
{
+        try {
+            if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission()))
+                throw new IgniteException("Failed to make directories [path=" + path + ",
props=" + props + "]");
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path +
", props=" + props + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
+        try {
+            FileStatus[] statuses = fileSys.listStatus(convert(path));
+
+            if (statuses == null)
+                throw new IgfsFileNotFoundException("Failed to list files (path not found):
" + path);
+
+            Collection<IgfsPath> res = new ArrayList<>(statuses.length);
+
+            for (FileStatus status : statuses)
+                res.add(new IgfsPath(path, status.getPath().getName()));
+
+            return res;
+        }
+        catch (FileNotFoundException ignored) {
+            throw new IgfsFileNotFoundException("Failed to list files (path not found): "
+ path);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file
system exception: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
+        try {
+            FileStatus[] statuses = fileSys.listStatus(convert(path));
+
+            if (statuses == null)
+                throw new IgfsFileNotFoundException("Failed to list files (path not found):
" + path);
+
+            Collection<IgfsFile> res = new ArrayList<>(statuses.length);
+
+            for (FileStatus status : statuses) {
+                IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status))
:
+                    new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null,
false,
+                    properties(status));
+
+                res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()),
fsInfo, 1));
+            }
+
+            return res;
+        }
+        catch (FileNotFoundException ignored) {
+            throw new IgfsFileNotFoundException("Failed to list files (path not found): "
+ path);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file
system exception: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsReader open(IgfsPath path, int bufSize) {
+        return new IgfsHadoopReader(fileSys, convert(path), bufSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream create(IgfsPath path, boolean overwrite) {
+        try {
+            return fileSys.create(convert(path), overwrite);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite="
+ overwrite + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int
replication,
+        long blockSize, @Nullable Map<String, String> props) {
+        IgfsHadoopFSProperties props0 =
+            new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap());
+
+        try {
+            return fileSys.create(convert(path), props0.permission(), overwrite, bufSize,
(short)replication, blockSize,
+                null);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props="
+ props +
+                ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" +
replication +
+                ", blockSize=" + blockSize + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
+        @Nullable Map<String, String> props) {
+        try {
+            return fileSys.append(convert(path), bufSize);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize="
+ bufSize + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(final IgfsPath path) {
+        try {
+            final FileStatus status = fileSys.getFileStatus(convert(path));
+
+            if (status == null)
+                return null;
+
+            final Map<String, String> props = properties(status);
+
+            return new IgfsFile() {
+                @Override public IgfsPath path() {
+                    return path;
+                }
+
+                @Override public boolean isFile() {
+                    return status.isFile();
+                }
+
+                @Override public boolean isDirectory() {
+                    return status.isDirectory();
+                }
+
+                @Override public int blockSize() {
+                    return (int)status.getBlockSize();
+                }
+
+                @Override public long groupBlockSize() {
+                    return status.getBlockSize();
+                }
+
+                @Override public long accessTime() {
+                    return status.getAccessTime();
+                }
+
+                @Override public long modificationTime() {
+                    return status.getModificationTime();
+                }
+
+                @Override public String property(String name) throws IllegalArgumentException
{
+                    String val = props.get(name);
+
+                    if (val ==  null)
+                        throw new IllegalArgumentException("File property not found [path="
+ path + ", name=" + name + ']');
+
+                    return val;
+                }
+
+                @Nullable @Override public String property(String name, @Nullable String
dfltVal) {
+                    String val = props.get(name);
+
+                    return val == null ? dfltVal : val;
+                }
+
+                @Override public long length() {
+                    return status.getLen();
+                }
+
+                /** {@inheritDoc} */
+                @Override public Map<String, String> properties() {
+                    return props;
+                }
+            };
+
+        }
+        catch (FileNotFoundException ignore) {
+            return null;
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long usedSpaceSize() {
+        try {
+            return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed();
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<String, String> properties() {
+        return props;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteCheckedException {
+        try {
+            fileSys.close();
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
index 8762d83..2f8b013 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.hadoop.*;
 import org.apache.ignite.internal.igfs.common.*;
 import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
index a38178c..ff8c50c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.hadoop.*;
 import org.apache.ignite.internal.igfs.common.*;
 import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
index e0ea1b6..c9d1322 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
@@ -27,7 +27,7 @@ import static org.apache.ignite.IgniteFs.*;
 /**
  * Hadoop file system properties.
  */
-class IgfsHadoopFSProperties {
+public class IgfsHadoopFSProperties {
     /** Username. */
     private String usrName;
 
@@ -43,7 +43,7 @@ class IgfsHadoopFSProperties {
      * @param props Properties.
      * @throws IgniteException In case of error.
      */
-    IgfsHadoopFSProperties(Map<String, String> props) throws IgniteException {
+    public IgfsHadoopFSProperties(Map<String, String> props) throws IgniteException
{
         usrName = props.get(PROP_USER_NAME);
         grpName = props.get(PROP_GROUP_NAME);
 
@@ -64,7 +64,7 @@ class IgfsHadoopFSProperties {
      *
      * @return User name.
      */
-    String userName() {
+    public String userName() {
         return usrName;
     }
 
@@ -73,7 +73,7 @@ class IgfsHadoopFSProperties {
      *
      * @return Group name.
      */
-    String groupName() {
+    public String groupName() {
         return grpName;
     }
 
@@ -82,7 +82,7 @@ class IgfsHadoopFSProperties {
      *
      * @return Permission.
      */
-    FsPermission permission() {
+    public FsPermission permission() {
         return perm;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
deleted file mode 100644
index 9935466..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link
org.apache.ignite.igfs.Igfs}.
- */
-public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable {
-    /** Property name for path to Hadoop configuration. */
-    public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
-
-    /** Property name for URI of file system. */
-    public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
-
-    /** Hadoop file system. */
-    private final FileSystem fileSys;
-
-    /** Properties of file system */
-    private final Map<String, String> props = new HashMap<>();
-
-    /**
-     * Constructor.
-     *
-     * @param uri URI of file system.
-     * @param cfgPath Additional path to Hadoop configuration.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public IgfsHadoopFileSystemWrapper(@Nullable String uri, @Nullable String cfgPath) throws
IgniteCheckedException {
-        Configuration cfg = new Configuration();
-
-        if (cfgPath != null)
-            cfg.addResource(U.resolveIgniteUrl(cfgPath));
-
-        try {
-            fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg);
-        }
-        catch (IOException | URISyntaxException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        uri = fileSys.getUri().toString();
-
-        if (!uri.endsWith("/"))
-            uri += "/";
-
-        props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
-        props.put(SECONDARY_FS_URI, uri);
-    }
-
-    /**
-     * Convert IGFS path into Hadoop path.
-     *
-     * @param path IGFS path.
-     * @return Hadoop path.
-     */
-    private Path convert(IgfsPath path) {
-        URI uri = fileSys.getUri();
-
-        return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
-    }
-
-    /**
-     * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate
exception.
-     *
-     * @param e Exception to check.
-     * @param detailMsg Detailed error message.
-     * @return Appropriate exception.
-     */
-    private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
-        boolean wrongVer = X.hasCause(e, RemoteException.class) ||
-            (e.getMessage() != null && e.getMessage().contains("Failed on local"));
-
-        IgfsException igfsErr = !wrongVer ? cast(detailMsg, e) :
-            new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs
from local " +
-                "version.", e);
-
-
-
-        return igfsErr;
-    }
-
-    /**
-     * Cast IO exception to IGFS exception.
-     *
-     * @param e IO exception.
-     * @return IGFS exception.
-     */
-    public static IgfsException cast(String msg, IOException e) {
-        if (e instanceof FileNotFoundException)
-            return new IgfsFileNotFoundException(e);
-        else if (e instanceof ParentNotDirectoryException)
-            return new IgfsParentNotDirectoryException(msg, e);
-        else if (e instanceof PathIsNotEmptyDirectoryException)
-            return new IgfsDirectoryNotEmptyException(e);
-        else if (e instanceof PathExistsException)
-            return new IgfsPathAlreadyExistsException(msg, e);
-        else
-            return new IgfsException(msg, e);
-    }
-
-    /**
-     * Convert Hadoop FileStatus properties to map.
-     *
-     * @param status File status.
-     * @return IGFS attributes.
-     */
-    private static Map<String, String> properties(FileStatus status) {
-        FsPermission perm = status.getPermission();
-
-        if (perm == null)
-            perm = FsPermission.getDefault();
-
-        return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME,
status.getOwner(),
-            PROP_GROUP_NAME, status.getGroup());
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean exists(IgfsPath path) {
-        try {
-            return fileSys.exists(convert(path));
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path
+ "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props)
{
-        IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props);
-
-        try {
-            if (props0.userName() != null || props0.groupName() != null)
-                fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
-
-            if (props0.permission() != null)
-                fileSys.setPermission(convert(path), props0.permission());
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path
+ "]");
-        }
-
-        //Result is not used in case of secondary FS.
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void rename(IgfsPath src, IgfsPath dest) {
-        // Delegate to the secondary file system.
-        try {
-            if (!fileSys.rename(convert(src), convert(dest)))
-                throw new IgfsException("Failed to rename (secondary file system returned
false) " +
-                    "[src=" + src + ", dest=" + dest + ']');
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest="
+ dest + ']');
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean delete(IgfsPath path, boolean recursive) {
-        try {
-            return fileSys.delete(convert(path), recursive);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive="
+ recursive + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void mkdirs(IgfsPath path) {
-        try {
-            if (!fileSys.mkdirs(convert(path)))
-                throw new IgniteException("Failed to make directories [path=" + path + "]");
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path +
"]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props)
{
-        try {
-            if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission()))
-                throw new IgniteException("Failed to make directories [path=" + path + ",
props=" + props + "]");
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path +
", props=" + props + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
-        try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
-
-            if (statuses == null)
-                throw new IgfsFileNotFoundException("Failed to list files (path not found):
" + path);
-
-            Collection<IgfsPath> res = new ArrayList<>(statuses.length);
-
-            for (FileStatus status : statuses)
-                res.add(new IgfsPath(path, status.getPath().getName()));
-
-            return res;
-        }
-        catch (FileNotFoundException ignored) {
-            throw new IgfsFileNotFoundException("Failed to list files (path not found): "
+ path);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file
system exception: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
-        try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
-
-            if (statuses == null)
-                throw new IgfsFileNotFoundException("Failed to list files (path not found):
" + path);
-
-            Collection<IgfsFile> res = new ArrayList<>(statuses.length);
-
-            for (FileStatus status : statuses) {
-                IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status))
:
-                    new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null,
false,
-                    properties(status));
-
-                res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()),
fsInfo, 1));
-            }
-
-            return res;
-        }
-        catch (FileNotFoundException ignored) {
-            throw new IgfsFileNotFoundException("Failed to list files (path not found): "
+ path);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file
system exception: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsReader open(IgfsPath path, int bufSize) {
-        return new IgfsHadoopReader(fileSys, convert(path), bufSize);
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputStream create(IgfsPath path, boolean overwrite) {
-        try {
-            return fileSys.create(convert(path), overwrite);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite="
+ overwrite + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int
replication,
-        long blockSize, @Nullable Map<String, String> props) {
-        IgfsHadoopFSProperties props0 =
-            new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap());
-
-        try {
-            return fileSys.create(convert(path), props0.permission(), overwrite, bufSize,
(short)replication, blockSize,
-                null);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props="
+ props +
-                ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" +
replication +
-                ", blockSize=" + blockSize + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
-        @Nullable Map<String, String> props) {
-        try {
-            return fileSys.append(convert(path), bufSize);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize="
+ bufSize + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile info(final IgfsPath path) {
-        try {
-            final FileStatus status = fileSys.getFileStatus(convert(path));
-
-            if (status == null)
-                return null;
-
-            final Map<String, String> props = properties(status);
-
-            return new IgfsFile() {
-                @Override public IgfsPath path() {
-                    return path;
-                }
-
-                @Override public boolean isFile() {
-                    return status.isFile();
-                }
-
-                @Override public boolean isDirectory() {
-                    return status.isDirectory();
-                }
-
-                @Override public int blockSize() {
-                    return (int)status.getBlockSize();
-                }
-
-                @Override public long groupBlockSize() {
-                    return status.getBlockSize();
-                }
-
-                @Override public long accessTime() {
-                    return status.getAccessTime();
-                }
-
-                @Override public long modificationTime() {
-                    return status.getModificationTime();
-                }
-
-                @Override public String property(String name) throws IllegalArgumentException
{
-                    String val = props.get(name);
-
-                    if (val ==  null)
-                        throw new IllegalArgumentException("File property not found [path="
+ path + ", name=" + name + ']');
-
-                    return val;
-                }
-
-                @Nullable @Override public String property(String name, @Nullable String
dfltVal) {
-                    String val = props.get(name);
-
-                    return val == null ? dfltVal : val;
-                }
-
-                @Override public long length() {
-                    return status.getLen();
-                }
-
-                /** {@inheritDoc} */
-                @Override public Map<String, String> properties() {
-                    return props;
-                }
-            };
-
-        }
-        catch (FileNotFoundException ignore) {
-            return null;
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public long usedSpaceSize() {
-        try {
-            return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed();
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public Map<String, String> properties() {
-        return props;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
-        try {
-            fileSys.close();
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java
index 7234269..3ab3acc 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java
@@ -56,7 +56,7 @@ public class IgfsHadoopReader implements IgfsReader {
      * @param path Path to the file to open.
      * @param bufSize Buffer size.
      */
-    IgfsHadoopReader(FileSystem fs, Path path, int bufSize) {
+    public IgfsHadoopReader(FileSystem fs, Path path, int bufSize) {
         assert fs != null;
         assert path != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
index 05a7b1d..29696bf 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
@@ -20,7 +20,7 @@ package org.apache.ignite.igfs;
 import junit.framework.*;
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.typedef.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
index 207bc79..9f9a6d8 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoop20FileSystemAbstractSelfTest.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.permission.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
index 22c144f..a54e264 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopDualAbstractSelfTest.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
+import org.apache.ignite.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
index 606eb48..7359fdf 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemAbstractSelfTest.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.permission.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.hadoop.*;
 import org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem;
 import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23bee413/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
index 2e22d93..b88816a 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsHadoopFileSystemSecondaryModeSelfTest.java
@@ -21,8 +21,8 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.hadoop.*;
 import org.apache.ignite.igfs.hadoop.v1.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;


Mime
View raw message