Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9148D200D3A for ; Wed, 11 Oct 2017 03:57:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8FEE6160BE1; Wed, 11 Oct 2017 01:57:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6854A160BE0 for ; Wed, 11 Oct 2017 03:57:29 +0200 (CEST) Received: (qmail 89508 invoked by uid 500); 11 Oct 2017 01:57:28 -0000 Mailing-List: contact notifications-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list notifications@asterixdb.apache.org Received: (qmail 89499 invoked by uid 99); 11 Oct 2017 01:57:28 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Oct 2017 01:57:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B498E1A3C55 for ; Wed, 11 Oct 2017 01:57:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.126 X-Spam-Level: ** X-Spam-Status: No, score=2.126 tagged_above=-999 required=6.31 tests=[MISSING_HEADERS=1.207, SPF_FAIL=0.919] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id TKgsOh5OJ3i8 for ; Wed, 11 Oct 2017 01:57:24 +0000 (UTC) Received: from vitalstatistix.ics.uci.edu (vitalstatistix.ics.uci.edu [128.195.52.38]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 84F235FD41 for ; Wed, 11 Oct 2017 01:57:24 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by vitalstatistix.ics.uci.edu (Postfix) with ESMTP id F410D1009BE; Tue, 10 Oct 2017 18:57:23 -0700 (PDT) Date: Tue, 10 Oct 2017 18:57:23 -0700 From: "Michael Blow (Code Review)" CC: Jenkins , Dmitry Lychagin , Murtadha Hubail , Till Westmann Reply-To: mblow@apache.org X-Gerrit-MessageType: merged Subject: Change in asterixdb[master]: [ASTERIXDB-1076][HYR] Generate heartbeats in their own thread X-Gerrit-Change-Id: Ieae21b1596013a699f27975fb21894244c536395 X-Gerrit-ChangeURL: X-Gerrit-Commit: 860fcde90bd799e00eaa61cbf81badfea1c25dda In-Reply-To: References: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.12.7 Message-Id: <20171011015723.F410D1009BE@vitalstatistix.ics.uci.edu> archived-at: Wed, 11 Oct 2017 01:57:30 -0000 Michael Blow has submitted this change and it was merged. Change subject: [ASTERIXDB-1076][HYR] Generate heartbeats in their own thread ...................................................................... [ASTERIXDB-1076][HYR] Generate heartbeats in their own thread - Generate & send NC heartbeats in their own thread to prevent starvation / scheduling issues - Fix retries on IPC connections - Don't spin on heartbeat send failure Change-Id: Ieae21b1596013a699f27975fb21894244c536395 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2060 Reviewed-by: Murtadha Hubail Integration-Tests: Murtadha Hubail Tested-by: Murtadha Hubail --- M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java 7 files changed, 92 insertions(+), 58 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved; Verified; Verified diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 02a469d..bd5895e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -168,7 +168,7 @@ case MESSAGING_PUBLIC_PORT: return "Public IP port to announce messaging listener"; case CLUSTER_CONNECT_RETRIES: - return "Number of attempts to contact CC before giving up"; + return "Number of attempts to retry contacting CC before giving up"; case IODEVICES: return "Comma separated list of IO Device mount points"; case NET_THREAD_COUNT: diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 4707487..ede2c41 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -51,8 +51,9 @@ } @Override - protected int getRetries(boolean first) { - return first ? clusterConnectRetries : 0; + protected int getMaxRetries(boolean first) { + // -1 == retry forever + return first ? clusterConnectRetries : -1; } @Override @@ -104,7 +105,7 @@ @Override public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception { NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData); - ensureIpcHandle().send(-1, fn, null); + ensureIpcHandle(0).send(-1, fn, null); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java index d4ccbd9..83972d5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java @@ -45,22 +45,25 @@ } protected IIPCHandle ensureIpcHandle() throws HyracksDataException { + return ensureIpcHandle(getMaxRetries(ipcHandle == null)); + } + + protected IIPCHandle ensureIpcHandle(int maxRetries) throws HyracksDataException { + if (ipcHandle != null && ipcHandle.isConnected()) { + return ipcHandle; + } try { final boolean first = ipcHandle == null; - if (first || !ipcHandle.isConnected()) { - if (!first) { - getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection"); - eventListener.ipcHandleDisconnected(ipcHandle); - } - ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first)); - if (ipcHandle.isConnected()) { - if (first) { - eventListener.ipcHandleConnected(ipcHandle); - } else { - getLogger().warning("ipcHandle " + ipcHandle + " restored"); - eventListener.ipcHandleRestored(ipcHandle); - } - } + if (!first) { + getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection"); + eventListener.ipcHandleDisconnected(ipcHandle); + } + ipcHandle = ipc.getHandle(inetSocketAddress, maxRetries); + if (first) { + eventListener.ipcHandleConnected(ipcHandle); + } else { + getLogger().warning("ipcHandle " + ipcHandle + " restored"); + eventListener.ipcHandleRestored(ipcHandle); } } catch (IPCException e) { throw HyracksDataException.create(e); @@ -68,7 +71,12 @@ return ipcHandle; } - protected abstract int getRetries(boolean first); + /** + * Maximum number of times to retry a failed connection attempt + * @param first true if the initial connection attempt (i.e. server start) + * @return the maximum number of retries, if any. <0 means retry forever + */ + protected abstract int getMaxRetries(boolean first); protected abstract Logger getLogger(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index 68a5b76..41284a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -48,8 +48,9 @@ } @Override - protected int getRetries(boolean first) { - return 0; + protected int getMaxRetries(boolean first) { + // -1 == retry forever + return -1; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 350343b..69137e5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -27,7 +27,6 @@ import java.lang.management.OperatingSystemMXBean; import java.lang.management.RuntimeMXBean; import java.lang.management.ThreadMXBean; -import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Hashtable; @@ -37,6 +36,7 @@ import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; @@ -98,6 +98,7 @@ private static final Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName()); private static final double MEMORY_FUDGE_FACTOR = 0.8; + private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1); private NCConfig ncConfig; @@ -133,7 +134,7 @@ private NodeParameters nodeParameters; - private HeartbeatTask heartbeatTask; + private Thread heartbeatThread; private final ServerContext serverCtx; @@ -308,15 +309,6 @@ workQueue.start(); - heartbeatTask = new HeartbeatTask(ccs); - - // Use reflection to set the priority of the timer thread. - Field threadField = timer.getClass().getDeclaredField("thread"); - threadField.setAccessible(true); - Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object. - timerThread.setPriority(Thread.MAX_PRIORITY); - // Schedule heartbeat generator. - timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod()); // Schedule tracing a human-readable datetime timer.schedule(new TraceCurrentTimeTask(serviceCtx.getTracer()), 0, 60000); @@ -362,6 +354,12 @@ registrationException); throw registrationException; } + // Start heartbeat generator. + heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat"); + heartbeatThread.setPriority(Thread.MAX_PRIORITY); + heartbeatThread.setDaemon(true); + heartbeatThread.start(); + serviceCtx.setDistributedState(nodeParameters.getDistributedState()); application.onRegisterNode(); LOGGER.info("Registering with Cluster Controller complete"); @@ -401,7 +399,10 @@ * Stop heartbeat after NC has stopped to avoid false node failure detection * on CC if an NC takes a long time to stop. */ - heartbeatTask.cancel(); + if (heartbeatThread != null) { + heartbeatThread.interrupt(); + heartbeatThread.join(1000); // give it 1s to stop gracefully + } LOGGER.log(Level.INFO, "Stopped NodeControllerService"); } else { LOGGER.log(Level.SEVERE, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack), @@ -478,17 +479,16 @@ return workQueue; } - public ThreadMXBean getThreadMXBean() { - return threadMXBean; - } - - private class HeartbeatTask extends TimerTask { - private IClusterController cc; + private class HeartbeatTask implements Runnable { + private final Semaphore delayBlock = new Semaphore(0); + private final IClusterController cc; + private final long heartbeatPeriodNanos; private final HeartbeatData hbData; - public HeartbeatTask(IClusterController cc) { + HeartbeatTask(IClusterController cc, int heartbeatPeriod) { this.cc = cc; + this.heartbeatPeriodNanos = TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod); hbData = new HeartbeatData(); hbData.gcCollectionCounts = new long[gcMXBeans.size()]; hbData.gcCollectionTimes = new long[gcMXBeans.size()]; @@ -496,6 +496,28 @@ @Override public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + long nextFireNanoTime = System.nanoTime() + heartbeatPeriodNanos; + final boolean success = execute(); + sleepUntilNextFire(success ? nextFireNanoTime - System.nanoTime() : ONE_SECOND_NANOS); + } catch (InterruptedException e) { // NOSONAR + break; + } + } + LOGGER.log(Level.INFO, "Heartbeat thread interrupted; shutting down"); + } + + private void sleepUntilNextFire(long delayNanos) throws InterruptedException { + if (delayNanos > 0) { + delayBlock.tryAcquire(delayNanos, TimeUnit.NANOSECONDS); //NOSONAR - ignore result of tryAcquire + } else { + LOGGER.warning("After sending heartbeat, next one is already late by " + + TimeUnit.NANOSECONDS.toMillis(-delayNanos) + "ms; sending without delay"); + } + } + + private boolean execute() throws InterruptedException { MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage(); hbData.heapInitSize = heapUsage.getInit(); hbData.heapUsedSize = heapUsage.getUsed(); @@ -541,8 +563,13 @@ try { cc.nodeHeartbeat(id, hbData); + LOGGER.log(Level.FINE, "Successfully sent heartbeat"); + return true; + } catch (InterruptedException e) { + throw e; } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Exception sending heartbeat", e); + LOGGER.log(Level.SEVERE, "Exception sending heartbeat; will retry after 1s", e); + return false; } } } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index d1659a8..36cf2fd 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -47,6 +47,10 @@ public class IPCConnectionManager { private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName()); + // TODO(mblow): the next two could be config parameters + private static final int INITIAL_RETRY_DELAY_MILLIS = 100; + private static final int MAX_RETRY_DELAY_MILLIS = 15000; + private final IPCSystem system; private final NetworkThread networkThread; @@ -99,9 +103,10 @@ networkThread.selector.wakeup(); } - IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException { + IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int maxRetries) throws IOException, InterruptedException { IPCHandle handle; - int attempt = 1; + int retries = 0; + int delay = INITIAL_RETRY_DELAY_MILLIS; while (true) { synchronized (this) { handle = ipcHandleMap.get(remoteAddress); @@ -114,19 +119,11 @@ if (handle.waitTillConnected()) { return handle; } - if (retries < 0) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Connection to " + remoteAddress + " failed, retrying..."); - attempt++; - Thread.sleep(5000); - } - } else if (attempt < retries) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Connection to " + remoteAddress + " failed (Attempt " + attempt + " of " + retries - + ")"); - attempt++; - Thread.sleep(5000); - } + if (maxRetries < 0 || retries++ < maxRetries) { + LOGGER.warning("Connection to " + remoteAddress + " failed; retrying" + (maxRetries <= 0 ? "" + : " (retry attempt " + retries + " of " + maxRetries + ") after " + delay + "ms")); + Thread.sleep(delay); + delay = Math.min(MAX_RETRY_DELAY_MILLIS, (int) (delay * 1.5)); } else { throw new IOException("Connection failed to " + remoteAddress); } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java index dea48bd..f27b268 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java @@ -68,9 +68,9 @@ return getHandle(remoteAddress, 0); } - public IIPCHandle getHandle(InetSocketAddress remoteAddress, int retries) throws IPCException { + public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries) throws IPCException { try { - return cMgr.getIPCHandle(remoteAddress, retries); + return cMgr.getIPCHandle(remoteAddress, maxRetries); } catch (IOException e) { throw new IPCException(e); } catch (InterruptedException e) { -- To view, visit https://asterix-gerrit.ics.uci.edu/2060 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ieae21b1596013a699f27975fb21894244c536395 Gerrit-PatchSet: 8 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow Gerrit-Reviewer: Dmitry Lychagin Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Michael Blow Gerrit-Reviewer: Murtadha Hubail Gerrit-Reviewer: Till Westmann