asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [1/2] incubator-asterixdb-hyracks git commit: Allow retries on IPCSystem.getHandle().
Date Fri, 01 May 2015 02:38:04 GMT
Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master 4cc70185b -> ec8d7a2f3


Allow retries on IPCSystem.getHandle().

NC will retry indefinitely to connect CC.

Change-Id: I0f4c15cacd265c3fbe85307af9f5c33577035447
Reviewed-on: https://asterix-gerrit.ics.uci.edu/250
Reviewed-by: Chris Hillery <ceej@lambda.nu>
Tested-by: Chris Hillery <ceej@lambda.nu>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/12bab0d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/12bab0d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/12bab0d3

Branch: refs/heads/master
Commit: 12bab0d36e2fe7031d8be1b2d58ff8bde0382ad8
Parents: 4cc7018
Author: Chris Hillery <chillery@lambda.nu>
Authored: Thu Apr 30 14:53:23 2015 -0700
Committer: Chris Hillery <ceej@lambda.nu>
Committed: Thu Apr 30 15:08:05 2015 -0700

----------------------------------------------------------------------
 .../control/nc/NodeControllerService.java       |  6 +-
 .../uci/ics/hyracks/ipc/impl/HandleState.java   |  1 +
 .../hyracks/ipc/impl/IPCConnectionManager.java  | 59 +++++++++++++++-----
 .../edu/uci/ics/hyracks/ipc/impl/IPCHandle.java |  5 +-
 .../edu/uci/ics/hyracks/ipc/impl/IPCSystem.java |  6 +-
 5 files changed, 54 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 02cdd70..88c05be 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -22,9 +22,7 @@ import java.lang.management.MemoryUsage;
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.List;
@@ -37,8 +35,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -260,7 +256,7 @@ public class NodeControllerService extends AbstractRemoteService {
         init();
 
         datasetNetworkManager.start();
-        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
+        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort),
-1);
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
index 8a5b979..d597741 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
@@ -19,5 +19,6 @@ enum HandleState {
     CONNECT_SENT,
     CONNECT_RECEIVED,
     CONNECTED,
+    CONNECT_FAILED,
     CLOSED,
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 81294c2..00f1f46 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -87,18 +87,39 @@ public class IPCConnectionManager {
         serverSocketChannel.close();
     }
 
-    IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException
{
+    IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException,
InterruptedException {
         IPCHandle handle;
-        synchronized (this) {
-            handle = ipcHandleMap.get(remoteAddress);
-            if (handle == null) {
-                handle = new IPCHandle(system, remoteAddress);
-                pendingConnections.add(handle);
-                networkThread.selector.wakeup();
+        int attempt = 1;
+        while (true) {
+            synchronized (this) {
+                handle = ipcHandleMap.get(remoteAddress);
+                if (handle == null) {
+                    handle = new IPCHandle(system, remoteAddress);
+                    pendingConnections.add(handle);
+                    networkThread.selector.wakeup();
+                }
+            }
+            if (handle.waitTillConnected()) {
+                return handle;
+            }
+            if (retries < 0) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Connection to " + remoteAddress + " failed, retrying...");
+                    attempt++;
+                    Thread.sleep(5000);
+                }
+            } else if (attempt < retries) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Connection to " + remoteAddress +
+                            " failed (Attempt " + attempt + " of " + retries + ")");
+                    attempt++;
+                    Thread.sleep(5000);
+                }
+            } else {
+                throw new IOException("Connection failed to " + remoteAddress);
             }
         }
-        handle.waitTillConnected();
-        return handle;
+
     }
 
     synchronized void registerHandle(IPCHandle handle) {
@@ -278,13 +299,21 @@ public class IPCConnectionManager {
                                 handle.setState(HandleState.CONNECT_RECEIVED);
                             } else if (key.isConnectable()) {
                                 SocketChannel channel = (SocketChannel) sc;
-                                if (channel.finishConnect()) {
-                                    IPCHandle handle = (IPCHandle) key.attachment();
-                                    handle.setState(HandleState.CONNECT_SENT);
-                                    registerHandle(handle);
-                                    key.interestOps(SelectionKey.OP_READ);
-                                    write(createInitialReqMessage(handle));
+                                IPCHandle handle = (IPCHandle) key.attachment();
+                                try {
+                                    if (!channel.finishConnect()) {
+                                        throw new Exception("Connection did not finish");
+                                    }
+                                }
+                                catch (Exception e) {
+                                    e.printStackTrace();
+                                    handle.setState(HandleState.CONNECT_FAILED);
+                                    continue;
                                 }
+                                handle.setState(HandleState.CONNECT_SENT);
+                                registerHandle(handle);
+                                key.interestOps(SelectionKey.OP_READ);
+                                write(createInitialReqMessage(handle));
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index 4908091..337440f 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -113,10 +113,11 @@ final class IPCHandle implements IIPCHandle {
         notifyAll();
     }
 
-    synchronized void waitTillConnected() throws InterruptedException {
-        while (!isConnected()) {
+    synchronized boolean waitTillConnected() throws InterruptedException {
+        while (state != HandleState.CONNECTED && state != HandleState.CONNECT_FAILED)
{
             wait();
         }
+        return state == HandleState.CONNECTED;
     }
 
     ByteBuffer getInBuffer() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/12bab0d3/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 9e7198b..35525c0 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -56,8 +56,12 @@ public class IPCSystem {
     }
 
     public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
+        return getHandle(remoteAddress, 0);
+    }
+
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int retries) throws IPCException
{
         try {
-            return cMgr.getIPCHandle(remoteAddress);
+            return cMgr.getIPCHandle(remoteAddress, retries);
         } catch (IOException e) {
             throw new IPCException(e);
         } catch (InterruptedException e) {


Mime
View raw message