ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-2206: Factory support for v2 file system.
Date Mon, 04 Jan 2016 09:33:39 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2206 10f4f7f38 -> f1715a410


IGNITE-2206: Factory support for v2 file system.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1715a41
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1715a41
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1715a41

Branch: refs/heads/ignite-2206
Commit: f1715a410ba68b605c1fe21ec5f4a8b566aa4631
Parents: 10f4f7f
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Mon Jan 4 12:34:43 2016 +0400
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Mon Jan 4 12:34:43 2016 +0400

----------------------------------------------------------------------
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |   4 +-
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    | 102 +++++++++++--------
 2 files changed, 60 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f1715a41/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 7719874..12c7c78 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -330,8 +330,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             if (initSecondary) {
                 try {
-                    Object payload0 = paths.getPayload(getClass().getClassLoader());
-
                     factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
                 }
                 catch (IgniteCheckedException e) {
@@ -1148,7 +1146,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
      * @return {@code true} If secondary file system is initialized.
      */
     public boolean hasSecondaryFileSystem() {
-        return secondaryFs != null;
+        return factory != null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1715a41/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 347f2ae..d3e732c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.InvalidPathException;
@@ -35,7 +36,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
 import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsMode;
@@ -57,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.BufferedOutputStream;
@@ -166,8 +170,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
     /** Mode resolver. */
     private IgfsModeResolver modeRslvr;
 
-    /** Secondary file system instance. */
-    private AbstractFileSystem secondaryFs;
+    /** The secondary file system factory. */
+    private HadoopFileSystemFactory factory;
 
     /** Whether custom sequential reads before prefetch value is provided. */
     private boolean seqReadsBeforePrefetchOverride;
@@ -333,26 +337,28 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
             }
 
             if (initSecondary) {
-                throw new IOException("Proxy mode is not supported for the V2 file system.");
-
-                // TODO: Enable.
-//                try {
-//                    factory = (HadoopAbstractFileSystemFactory) paths.getPayload();
-//                }
-//                catch (IgniteCheckedException e) {
-//                    throw new IOException("Failed to get secondary file system factory.",
e);
-//                }
-//
-//                A.ensure(secondaryUri != null, "File system factory uri should not be null.");
-//
-//                try {
-//                    secondaryFs = factory.get(user);
-//
-//                    secondaryUri = secondaryFs.getUri();
-//                }
-//                catch (IOException e) {
-//                    throw new IOException("Failed to connect to the secondary file system:
" + secondaryUri, e);
-//                }
+                try {
+                    factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to get secondary file system factory.",
e);
+                }
+
+                assert factory != null;
+
+                if (factory instanceof LifecycleAware)
+                    ((LifecycleAware) factory).start();
+
+                try {
+                    FileSystem secFs = factory.create(user);
+
+                    secondaryUri = secFs.getUri();
+
+                    A.ensure(secondaryUri != null, "Secondary file system uri should not
be null.");
+                }
+                catch (IOException e) {
+                    throw new IOException("Failed to connect to the secondary file system:
" + secondaryUri, e);
+                }
             }
         }
         finally {
@@ -371,9 +377,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
             if (clientLog.isLogEnabled())
                 clientLog.close();
 
-            // TODO: Enable.
-//            if (factory instanceof LifecycleAware)
-//                ((LifecycleAware) factory).stop();
+            if (factory instanceof LifecycleAware)
+                ((LifecycleAware) factory).stop();
 
             // Reset initialized resources.
             rmtClient = null;
@@ -398,13 +403,13 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
 
     /** {@inheritDoc} */
     @Override public boolean setReplication(Path f, short replication) throws IOException
{
-        return mode(f) == PROXY && secondaryFs.setReplication(f, replication);
+        return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication);
     }
 
     /** {@inheritDoc} */
     @Override public void setTimes(Path f, long mtime, long atime) throws IOException {
         if (mode(f) == PROXY)
-            secondaryFs.setTimes(f, mtime, atime);
+            secondaryFileSystem().setTimes(f, mtime, atime);
         else {
             if (mtime == -1 && atime == -1)
                 return;
@@ -428,7 +433,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
             A.notNull(p, "p");
 
             if (mode(p) == PROXY)
-                secondaryFs.setPermission(toSecondary(p), perm);
+                secondaryFileSystem().setPermission(toSecondary(p), perm);
             else {
                 if (rmtClient.update(convert(p), permission(perm)) == null)
                     throw new IOException("Failed to set file permission (file not found?)"
+
@@ -450,7 +455,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
 
         try {
             if (mode(p) == PROXY)
-                secondaryFs.setOwner(toSecondary(p), usr, grp);
+                secondaryFileSystem().setOwner(toSecondary(p), usr, grp);
             else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME,
grp)) == null)
                 throw new IOException("Failed to set file permission (file not found?)" +
                     " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
@@ -471,11 +476,11 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
             IgfsMode mode = modeRslvr.resolveMode(path);
 
             if (mode == PROXY) {
-                FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize);
+                FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize);
 
                 if (clientLog.isLogEnabled()) {
                     // At this point we do not know file size, so we perform additional request
to remote FS to get it.
-                    FileStatus status = secondaryFs.getFileStatus(toSecondary(f));
+                    FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f));
 
                     long size = status != null ? status.getLen() : -1;
 
@@ -550,8 +555,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
                     path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
 
             if (mode == PROXY) {
-                FSDataOutputStream os = secondaryFs.createInternal(toSecondary(f), flag,
perm, bufSize,
-                    replication, blockSize, progress, checksumOpt, createParent);
+                FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm,
flag, bufSize,
+                    replication, blockSize, progress);
 
                 if (clientLog.isLogEnabled()) {
                     long logId = IgfsLogger.nextId();
@@ -648,7 +653,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
                 if (clientLog.isLogEnabled())
                     clientLog.logRename(srcPath, PROXY, dstPath);
 
-                secondaryFs.renameInternal(toSecondary(src), toSecondary(dst));
+                secondaryFileSystem().rename(toSecondary(src), toSecondary(dst));
             }
             else {
                 if (clientLog.isLogEnabled())
@@ -678,7 +683,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
                 if (clientLog.isLogEnabled())
                     clientLog.logDelete(path, PROXY, recursive);
 
-                return secondaryFs.delete(toSecondary(f), recursive);
+                return secondaryFileSystem().delete(toSecondary(f), recursive);
             }
 
             boolean res = rmtClient.delete(path, recursive);
@@ -696,14 +701,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
     /** {@inheritDoc} */
     @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException {
         // Checksum has effect for secondary FS only.
-        if (secondaryFs != null)
-            secondaryFs.setVerifyChecksum(verifyChecksum);
+        if (factory != null)
+            secondaryFileSystem().setVerifyChecksum(verifyChecksum);
     }
 
     /** {@inheritDoc} */
     @Override public FileChecksum getFileChecksum(Path f) throws IOException {
         if (mode(f) == PROXY)
-            return secondaryFs.getFileChecksum(f);
+            return secondaryFileSystem().getFileChecksum(f);
 
         return null;
     }
@@ -719,7 +724,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
             IgfsMode mode = modeRslvr.resolveMode(path);
 
             if (mode == PROXY) {
-                FileStatus[] arr = secondaryFs.listStatus(toSecondary(f));
+                FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f));
 
                 if (arr == null)
                     throw new FileNotFoundException("File " + f + " does not exist.");
@@ -782,7 +787,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
                 if (clientLog.isLogEnabled())
                     clientLog.logMakeDirectory(path, PROXY);
 
-                secondaryFs.mkdir(toSecondary(f), perm, createParent);
+                secondaryFileSystem().mkdirs(toSecondary(f), perm);
             }
             else {
                 rmtClient.mkdirs(path, permission(perm));
@@ -804,7 +809,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
 
         try {
             if (mode(f) == PROXY)
-                return toPrimary(secondaryFs.getFileStatus(toSecondary(f)));
+                return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f)));
             else {
                 IgfsFile info = rmtClient.info(convert(f));
 
@@ -829,7 +834,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
 
         try {
             if (modeRslvr.resolveMode(igfsPath) == PROXY)
-                return secondaryFs.getFileBlockLocations(path, start, len);
+                return secondaryFileSystem().getFileBlockLocations(path, start, len);
             else {
                 long now = System.currentTimeMillis();
 
@@ -880,7 +885,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
      * @return Secondary file system path.
      */
     private Path toSecondary(Path path) {
-        assert secondaryFs != null;
+        assert factory != null;
         assert secondaryUri != null;
 
         return convertPath(path, secondaryUri);
@@ -1052,4 +1057,15 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements
Closea
     public String user() {
         return user;
     }
+
+    /**
+     * Gets cached or creates a {@link FileSystem}.
+     *
+     * @return The secondary file system.
+     */
+    private FileSystem secondaryFileSystem() throws IOException{
+        assert factory != null;
+
+        return factory.create(user);
+    }
 }
\ No newline at end of file


Mime
View raw message