ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [03/17] ignite git commit: IGNITE-1477: Fixed.
Date Tue, 15 Sep 2015 12:50:25 GMT
IGNITE-1477: Fixed.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c0e1ac18
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c0e1ac18
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c0e1ac18

Branch: refs/heads/ignite-gg-10760
Commit: c0e1ac1842df19a4de83dbbcc99090b43371c913
Parents: c065512
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Sep 15 10:25:09 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Sep 15 10:25:09 2015 +0300

----------------------------------------------------------------------
 .../hadoop/igfs/HadoopIgfsWrapper.java          | 94 +++++++++++---------
 1 file changed, 53 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c0e1ac18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
index abbb142..01189f7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
 import org.apache.ignite.internal.processors.igfs.IgfsStatus;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
@@ -339,7 +340,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
      * @return Delegate.
      */
     private Delegate delegate() throws HadoopIgfsCommunicationException {
-        Exception err = null;
+        // These fields will contain possible exceptions from shmem and TCP endpoints.
+        Exception errShmem = null;
+        Exception errTcp = null;
 
         // 1. If delegate is set, return it immediately.
         Delegate curDelegate = delegateRef.get();
@@ -357,8 +360,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
 
                     igfs = (IgfsEx)ignite.fileSystem(endpoint.igfs());
                 }
-                catch (Exception e) {
-                    err = e;
+                catch (Exception ignore) {
+                    // No-op.
                 }
             }
             else {
@@ -368,8 +371,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
 
                         break;
                     }
-                    catch (Exception e) {
-                        err = e;
+                    catch (Exception ignore) {
+                        // No-op.
                     }
                 }
             }
@@ -388,57 +391,54 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                             hadoop.close(true);
 
                     if (log.isDebugEnabled())
-                        log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.",
e);
-
-                    err = e;
+                        log.debug("Failed to connect to in-process IGFS, fallback to IPC
mode.", e);
                 }
             }
         }
 
         // 3. Try connecting using shmem.
-        if (!parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) {
-            if (curDelegate == null && !U.isWindows()) {
-                HadoopIgfsEx hadoop = null;
+        boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority,
false);
 
-                try {
-                    hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(),
log, userName);
+        if (curDelegate == null && !skipLocShmem && !U.isWindows()) {
+            HadoopIgfsEx hadoop = null;
 
-                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    if (e instanceof HadoopIgfsCommunicationException)
-                        hadoop.close(true);
+            try {
+                hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(),
log, userName);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to connect to out-proc local IGFS using shmem.",
e);
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof HadoopIgfsCommunicationException)
+                    hadoop.close(true);
 
-                    err = e;
-                }
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port()
+ ']', e);
+
+                errShmem = e;
             }
         }
 
         // 4. Try local TCP connection.
         boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority,
false);
 
-        if (!skipLocTcp) {
-            if (curDelegate == null) {
-                HadoopIgfsEx hadoop = null;
+        if (curDelegate == null && !skipLocTcp) {
+            HadoopIgfsEx hadoop = null;
 
-                try {
-                    hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(),
endpoint.igfs(),
-                        log, userName);
+            try {
+                hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(),
endpoint.igfs(),
+                    log, userName);
 
-                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    if (e instanceof HadoopIgfsCommunicationException)
-                        hadoop.close(true);
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof HadoopIgfsCommunicationException)
+                    hadoop.close(true);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to connect to out-proc local IGFS using TCP.",
e);
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host()
+
+                        ", port=" + endpoint.port() + ']', e);
 
-                    err = e;
-                }
+                errTcp = e;
             }
         }
 
@@ -457,9 +457,10 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                     hadoop.close(true);
 
                 if (log.isDebugEnabled())
-                    log.debug("Failed to connect to out-proc remote IGFS using TCP.", e);
+                    log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host()
+
+                        ", port=" + endpoint.port() + ']', e);
 
-                err = e;
+                errTcp = e;
             }
         }
 
@@ -469,8 +470,19 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
 
             return curDelegate;
         }
-        else
-            throw new HadoopIgfsCommunicationException("Failed to connect to IGFS: " + endpoint,
err);
+        else {
+            SB errMsg = new SB("Failed to connect to IGFS [endpoint=" + authority + ", attempts=[");
+
+            if (errShmem != null)
+                errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "],
");
+
+            errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() +
", err=" + errTcp + "]] ");
+
+            errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure
that " +
+                "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint).");
+
+            throw new HadoopIgfsCommunicationException(errMsg.toString());
+        }
     }
 
     /**


Mime
View raw message