asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Blow (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [ASTERIXDB-1076][HYR] Generate heartbeats in their own thread
Date Wed, 11 Oct 2017 01:57:23 GMT
Michael Blow has submitted this change and it was merged.

Change subject: [ASTERIXDB-1076][HYR] Generate heartbeats in their own thread
......................................................................


[ASTERIXDB-1076][HYR] Generate heartbeats in their own thread

- Generate & send NC heartbeats in their own thread to prevent starvation
/ scheduling issues
- Fix retries on IPC connections
- Don't spin on heartbeat send failure

Change-Id: Ieae21b1596013a699f27975fb21894244c536395
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2060
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
---
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
7 files changed, 92 insertions(+), 58 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved; Verified; Verified



diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 02a469d..bd5895e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -168,7 +168,7 @@
                 case MESSAGING_PUBLIC_PORT:
                     return "Public IP port to announce messaging listener";
                 case CLUSTER_CONNECT_RETRIES:
-                    return "Number of attempts to contact CC before giving up";
+                    return "Number of attempts to retry contacting CC before giving up";
                 case IODEVICES:
                     return "Comma separated list of IO Device mount points";
                 case NET_THREAD_COUNT:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 4707487..ede2c41 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -51,8 +51,9 @@
     }
 
     @Override
-    protected int getRetries(boolean first) {
-        return first ? clusterConnectRetries : 0;
+    protected int getMaxRetries(boolean first) {
+        // -1 == retry forever
+        return first ? clusterConnectRetries : -1;
     }
 
     @Override
@@ -104,7 +105,7 @@
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
         NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
-        ensureIpcHandle().send(-1, fn, null);
+        ensureIpcHandle(0).send(-1, fn, null);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
index d4ccbd9..83972d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -45,22 +45,25 @@
     }
 
     protected IIPCHandle ensureIpcHandle() throws HyracksDataException {
+        return ensureIpcHandle(getMaxRetries(ipcHandle == null));
+    }
+
+    protected IIPCHandle ensureIpcHandle(int maxRetries) throws HyracksDataException {
+        if (ipcHandle != null && ipcHandle.isConnected()) {
+            return ipcHandle;
+        }
         try {
             final boolean first = ipcHandle == null;
-            if (first || !ipcHandle.isConnected()) {
-                if (!first) {
-                    getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying
connection");
-                    eventListener.ipcHandleDisconnected(ipcHandle);
-                }
-                ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
-                if (ipcHandle.isConnected()) {
-                    if (first) {
-                        eventListener.ipcHandleConnected(ipcHandle);
-                    } else {
-                        getLogger().warning("ipcHandle " + ipcHandle + " restored");
-                        eventListener.ipcHandleRestored(ipcHandle);
-                    }
-                }
+            if (!first) {
+                getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
+                eventListener.ipcHandleDisconnected(ipcHandle);
+            }
+            ipcHandle = ipc.getHandle(inetSocketAddress, maxRetries);
+            if (first) {
+                eventListener.ipcHandleConnected(ipcHandle);
+            } else {
+                getLogger().warning("ipcHandle " + ipcHandle + " restored");
+                eventListener.ipcHandleRestored(ipcHandle);
             }
         } catch (IPCException e) {
             throw HyracksDataException.create(e);
@@ -68,7 +71,12 @@
         return ipcHandle;
     }
 
-    protected abstract int getRetries(boolean first);
+    /**
+     * Maximum number of times to retry a failed connection attempt
+     * @param first true if the initial connection attempt (i.e. server start)
+     * @return the maximum number of retries, if any.  <0 means retry forever
+     */
+    protected abstract int getMaxRetries(boolean first);
 
     protected abstract Logger getLogger();
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 68a5b76..41284a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -48,8 +48,9 @@
     }
 
     @Override
-    protected int getRetries(boolean first) {
-        return 0;
+    protected int getMaxRetries(boolean first) {
+        // -1 == retry forever
+        return -1;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 350343b..69137e5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -27,7 +27,6 @@
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Hashtable;
@@ -37,6 +36,7 @@
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
@@ -98,6 +98,7 @@
     private static final Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
 
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
+    private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
 
     private NCConfig ncConfig;
 
@@ -133,7 +134,7 @@
 
     private NodeParameters nodeParameters;
 
-    private HeartbeatTask heartbeatTask;
+    private Thread heartbeatThread;
 
     private final ServerContext serverCtx;
 
@@ -308,15 +309,6 @@
 
         workQueue.start();
 
-        heartbeatTask = new HeartbeatTask(ccs);
-
-        // Use reflection to set the priority of the timer thread.
-        Field threadField = timer.getClass().getDeclaredField("thread");
-        threadField.setAccessible(true);
-        Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread
of the Timer object.
-        timerThread.setPriority(Thread.MAX_PRIORITY);
-        // Schedule heartbeat generator.
-        timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
         // Schedule tracing a human-readable datetime
         timer.schedule(new TraceCurrentTimeTask(serviceCtx.getTracer()), 0, 60000);
 
@@ -362,6 +354,12 @@
                     registrationException);
             throw registrationException;
         }
+        // Start heartbeat generator.
+        heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()),
id + "-Heartbeat");
+        heartbeatThread.setPriority(Thread.MAX_PRIORITY);
+        heartbeatThread.setDaemon(true);
+        heartbeatThread.start();
+
         serviceCtx.setDistributedState(nodeParameters.getDistributedState());
         application.onRegisterNode();
         LOGGER.info("Registering with Cluster Controller complete");
@@ -401,7 +399,10 @@
              * Stop heartbeat after NC has stopped to avoid false node failure detection
              * on CC if an NC takes a long time to stop.
              */
-            heartbeatTask.cancel();
+            if (heartbeatThread != null) {
+                heartbeatThread.interrupt();
+                heartbeatThread.join(1000); // give it 1s to stop gracefully
+            }
             LOGGER.log(Level.INFO, "Stopped NodeControllerService");
         } else {
             LOGGER.log(Level.SEVERE, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack),
@@ -478,17 +479,16 @@
         return workQueue;
     }
 
-    public ThreadMXBean getThreadMXBean() {
-        return threadMXBean;
-    }
-
-    private class HeartbeatTask extends TimerTask {
-        private IClusterController cc;
+    private class HeartbeatTask implements Runnable {
+        private final Semaphore delayBlock = new Semaphore(0);
+        private final IClusterController cc;
+        private final long heartbeatPeriodNanos;
 
         private final HeartbeatData hbData;
 
-        public HeartbeatTask(IClusterController cc) {
+        HeartbeatTask(IClusterController cc, int heartbeatPeriod) {
             this.cc = cc;
+            this.heartbeatPeriodNanos = TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod);
             hbData = new HeartbeatData();
             hbData.gcCollectionCounts = new long[gcMXBeans.size()];
             hbData.gcCollectionTimes = new long[gcMXBeans.size()];
@@ -496,6 +496,28 @@
 
         @Override
         public void run() {
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    long nextFireNanoTime = System.nanoTime() + heartbeatPeriodNanos;
+                    final boolean success = execute();
+                    sleepUntilNextFire(success ? nextFireNanoTime - System.nanoTime() : ONE_SECOND_NANOS);
+                } catch (InterruptedException e) { // NOSONAR
+                    break;
+                }
+            }
+            LOGGER.log(Level.INFO, "Heartbeat thread interrupted; shutting down");
+        }
+
+        private void sleepUntilNextFire(long delayNanos) throws InterruptedException {
+            if (delayNanos > 0) {
+                delayBlock.tryAcquire(delayNanos, TimeUnit.NANOSECONDS); //NOSONAR - ignore
result of tryAcquire
+            } else {
+                LOGGER.warning("After sending heartbeat, next one is already late by "
+                        + TimeUnit.NANOSECONDS.toMillis(-delayNanos) + "ms; sending without
delay");
+            }
+        }
+
+        private boolean execute() throws InterruptedException {
             MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
             hbData.heapInitSize = heapUsage.getInit();
             hbData.heapUsedSize = heapUsage.getUsed();
@@ -541,8 +563,13 @@
 
             try {
                 cc.nodeHeartbeat(id, hbData);
+                LOGGER.log(Level.FINE, "Successfully sent heartbeat");
+                return true;
+            } catch (InterruptedException e) {
+                throw e;
             } catch (Exception e) {
-                LOGGER.log(Level.SEVERE, "Exception sending heartbeat", e);
+                LOGGER.log(Level.SEVERE, "Exception sending heartbeat; will retry after 1s",
e);
+                return false;
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index d1659a8..36cf2fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -47,6 +47,10 @@
 public class IPCConnectionManager {
     private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());
 
+    // TODO(mblow): the next two could be config parameters
+    private static final int INITIAL_RETRY_DELAY_MILLIS = 100;
+    private static final int MAX_RETRY_DELAY_MILLIS = 15000;
+
     private final IPCSystem system;
 
     private final NetworkThread networkThread;
@@ -99,9 +103,10 @@
         networkThread.selector.wakeup();
     }
 
-    IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException,
InterruptedException {
+    IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int maxRetries) throws IOException,
InterruptedException {
         IPCHandle handle;
-        int attempt = 1;
+        int retries = 0;
+        int delay = INITIAL_RETRY_DELAY_MILLIS;
         while (true) {
             synchronized (this) {
                 handle = ipcHandleMap.get(remoteAddress);
@@ -114,19 +119,11 @@
             if (handle.waitTillConnected()) {
                 return handle;
             }
-            if (retries < 0) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Connection to " + remoteAddress + " failed, retrying...");
-                    attempt++;
-                    Thread.sleep(5000);
-                }
-            } else if (attempt < retries) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Connection to " + remoteAddress + " failed (Attempt " +
attempt + " of " + retries
-                            + ")");
-                    attempt++;
-                    Thread.sleep(5000);
-                }
+            if (maxRetries < 0 || retries++ < maxRetries) {
+                LOGGER.warning("Connection to " + remoteAddress + " failed; retrying" + (maxRetries
<= 0 ? ""
+                        : " (retry attempt " + retries + " of " + maxRetries + ") after "
+ delay + "ms"));
+                Thread.sleep(delay);
+                delay = Math.min(MAX_RETRY_DELAY_MILLIS, (int) (delay * 1.5));
             } else {
                 throw new IOException("Connection failed to " + remoteAddress);
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index dea48bd..f27b268 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -68,9 +68,9 @@
         return getHandle(remoteAddress, 0);
     }
 
-    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int retries) throws IPCException
{
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries) throws IPCException
{
         try {
-            return cMgr.getIPCHandle(remoteAddress, retries);
+            return cMgr.getIPCHandle(remoteAddress, maxRetries);
         } catch (IOException e) {
             throw new IPCException(e);
         } catch (InterruptedException e) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2060
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ieae21b1596013a699f27975fb21894244c536395
Gerrit-PatchSet: 8
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <mblow@apache.org>
Gerrit-Reviewer: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Gerrit-Reviewer: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mblow@apache.org>
Gerrit-Reviewer: Murtadha Hubail <mhubail@apache.org>
Gerrit-Reviewer: Till Westmann <tillw@apache.org>

Mime
View raw message