asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Blow (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: NC Shutdown Hangs On Stuck Work, Tweak Shutdown Timeouts
Date Tue, 27 Sep 2016 05:24:09 GMT
Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1217

Change subject: NC Shutdown Hangs On Stuck Work, Tweak Shutdown Timeouts
......................................................................

NC Shutdown Hangs On Stuck Work, Tweak Shutdown Timeouts

1. Fix NC WorkQueue shutdown to interrupt() possibly stuck work
2. Adjust NC shutdown timeout on CC to allow NC to timeout any work it
   is awaiting
3. Improved logging (i.e. /printStackTrace()/LOGGER.log.../)

Change-Id: I12a9577c570c095afeac882664d29f0c8f53a4ad
---
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
5 files changed, 56 insertions(+), 52 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/17/1217/1

diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index f88f30f..fda9cc3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -22,6 +22,7 @@
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
@@ -46,21 +47,22 @@
 
     @Override
     public ClusterControllerInfo getClusterControllerInfo() throws Exception {
-        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif =
+                new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
         return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
     }
 
     @Override
     public JobStatus getJobStatus(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+                new HyracksClientInterfaceFunctions.GetJobStatusFunction(jobId);
         return (JobStatus) rpci.call(ipcHandle, gjsf);
     }
 
     @Override
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception
{
-        HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
-                acggfBytes, jobFlags);
+        HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                new HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -73,47 +75,50 @@
 
     @Override
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
-        HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf = new
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
+        HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf =
+                new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
         return (NetworkAddress) rpci.call(ipcHandle, gddsf);
     }
 
     @Override
     public void waitForCompletion(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
+                new HyracksClientInterfaceFunctions.WaitForCompletionFunction(jobId);
         rpci.call(ipcHandle, wfcf);
     }
 
     @Override
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception
{
-        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif =
+                new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
         return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
     }
 
     @Override
     public ClusterTopology getClusterTopology() throws Exception {
-        HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf = new HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
+        HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf =
+                new HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
         return (ClusterTopology) rpci.call(ipcHandle, gctf);
     }
 
     @Override
     public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws
Exception {
-        HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(
-                binaryURLs, deploymentId);
+        HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+                new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId);
         rpci.call(ipcHandle, dbf);
     }
 
     @Override
     public void unDeployBinary(DeploymentId deploymentId) throws Exception {
-        HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(
-                deploymentId);
+        HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf =
+                new HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(deploymentId);
         rpci.call(ipcHandle, dbf);
     }
 
     @Override
     public JobInfo getJobInfo(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = new HyracksClientInterfaceFunctions.GetJobInfoFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
+                new HyracksClientInterfaceFunctions.GetJobInfoFunction(jobId);
         return (JobInfo) rpci.call(ipcHandle, gjsf);
     }
 
@@ -122,14 +127,15 @@
         HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf =
                 new HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService);
         rpci.call(ipcHandle, csdf);
-        //give the CC some time to do final settling after it returns our request
-        for (int i = 3; ipcHandle.isConnected() && i > 0; i--) {
+        // give the CC some time to do final settling after it returns our request
+        int seconds = 30;
+        while (ipcHandle.isConnected() && --seconds > 0) {
             synchronized (this) {
-                wait(3000l); //3sec
+                wait(TimeUnit.SECONDS.toMillis(1));
             }
         }
         if (ipcHandle.isConnected()) {
-            throw new IPCException("CC refused to release connection after 9 seconds");
+            throw new IPCException("CC refused to release connection after 30 seconds");
         }
     }
 
@@ -145,6 +151,5 @@
         HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
                 new HyracksClientInterfaceFunctions.ThreadDumpFunction(node);
         return (String)rpci.call(ipcHandle, tdf);
-
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
index 29e1f83..e05dfbc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
@@ -50,7 +50,7 @@
     public void doRun() {
         try {
             if (ccs.getShutdownRun() != null) {
-                throw new IPCException("Shutdown in Progress");
+                throw new IPCException("Shutdown already in progress");
             }
             Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
             Set<String> nodeIds = new TreeSet<>();
@@ -73,12 +73,13 @@
                         /*
                          * wait for all our acks
                          */
+                        LOGGER.info("Waiting for NCs to shutdown...");
                         boolean cleanShutdown = shutdownStatus.waitForCompletion();
                         if (!cleanShutdown) {
                             /*
                              * best effort - just exit, user will have to kill misbehaving
NCs
                              */
-                            LOGGER.severe("Clean shutdown of NCs timed out- giving up!  Unresponsive
nodes: " +
+                            LOGGER.severe("Clean shutdown of NCs timed out- giving up; unresponsive
nodes: " +
                                     shutdownStatus.getRemainingNodes());
                         }
                         callback.setValue(cleanShutdown);
@@ -96,12 +97,13 @@
         }
     }
 
-    protected void shutdownNode(String key, NodeControllerState ncState) {
+    protected void shutdownNode(String nodeId, NodeControllerState ncState) {
         try {
+            LOGGER.info("Notifying NC " + nodeId + " to shutdown...");
             ncState.getNodeController().shutdown(terminateNCService);
         } catch (Exception e) {
-            LOGGER.log(
-                    Level.INFO, "Exception shutting down NC " + key + " (possibly dead?),
continuing shutdown...", e);
+            LOGGER.log(Level.INFO,
+                    "Exception shutting down NC " + nodeId + " (possibly dead?), continuing
shutdown...", e);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
index 4e5c98f..0a50f6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
@@ -21,12 +21,13 @@
 
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 public class ShutdownRun implements IShutdownStatusConditionVariable{
 
     private final Set<String> shutdownNodeIds = new TreeSet<>();
     private boolean shutdownSuccess = false;
-    private static final int SHUTDOWN_TIMER_MS = 10000; //10 seconds
+    private static final long SHUTDOWN_TIMER_MS = TimeUnit.SECONDS.toMillis(30);
 
     public ShutdownRun(Set<String> nodeIds) {
         shutdownNodeIds.addAll(nodeIds);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
index f9df54b..fe0821f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
@@ -45,7 +45,7 @@
             throw new IllegalArgumentException("Illegal thread priority number.");
         }
         this.threadPriority = threadPriority;
-        queue = new LinkedBlockingQueue<AbstractWork>();
+        queue = new LinkedBlockingQueue<>();
         thread = new WorkerThread(id);
         stopSemaphore = new Semaphore(1);
         stopped = true;
@@ -59,6 +59,7 @@
         try {
             stopSemaphore.acquire();
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new HyracksException(e);
         }
         if (DEBUG) {
@@ -73,14 +74,11 @@
         synchronized (this) {
             stopped = true;
         }
-        schedule(new AbstractWork() {
-            @Override
-            public void run() {
-            }
-        });
+        thread.interrupt();
         try {
             stopSemaphore.acquire();
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new HyracksException(e);
         }
     }
@@ -119,8 +117,8 @@
                     }
                     try {
                         r = queue.take();
-                    } catch (InterruptedException e) {
-                        continue;
+                    } catch (InterruptedException e) { // NOSONAR: aborting the thread
+                        break;
                     }
                     if (DEBUG) {
                         LOGGER.log(Level.FINEST,
@@ -133,7 +131,7 @@
                         }
                         r.run();
                     } catch (Exception e) {
-                        e.printStackTrace();
+                        LOGGER.log(Level.WARNING, "Exception while executing " + r, e);
                     }
                 }
             } finally {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index d7facf0..6571c0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -98,7 +98,7 @@
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 
 public class NodeControllerService implements IControllerService {
-    private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
 
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
 
@@ -182,7 +182,7 @@
 
         lccm = new LifeCycleComponentManager();
         queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the
heartbeat thread.
-        jobletMap = new Hashtable<JobId, Joblet>();
+        jobletMap = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
                 new File(new File(NodeControllerService.class.getName()), id));
@@ -192,7 +192,7 @@
         runtimeMXBean = ManagementFactory.getRuntimeMXBean();
         osMXBean = ManagementFactory.getOperatingSystemMXBean();
         registrationPending = true;
-        getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String,
NodeControllerInfo>>>();
+        getNodeControllerInfosAcceptor = new MutableObject<>();
         memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax()
* MEMORY_FUDGE_FACTOR));
         ioCounter = new IOCounterFactory().getIOCounter();
     }
@@ -210,7 +210,7 @@
     }
 
     private static List<IODeviceHandle> getDevices(String ioDevices) {
-        List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+        List<IODeviceHandle> devices = new ArrayList<>();
         StringTokenizer tok = new StringTokenizer(ioDevices, ",");
         while (tok.hasMoreElements()) {
             String devPath = tok.nextToken().trim();
@@ -227,7 +227,7 @@
     }
 
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception
{
-        FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String,
NodeControllerInfo>>();
+        FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<>();
         synchronized (getNodeControllerInfosAcceptor) {
             while (getNodeControllerInfosAcceptor.getValue() != null) {
                 getNodeControllerInfosAcceptor.wait();
@@ -350,7 +350,7 @@
             LOGGER.log(Level.INFO, "Stopping NodeControllerService");
             executor.shutdownNow();
             if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
-                LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing shutdown abnormally");
+                LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing with abnormal
shutdown");
             }
             partitionManager.close();
             datasetPartitionManager.close();
@@ -480,7 +480,7 @@
             try {
                 cc.nodeHeartbeat(id, hbData);
             } catch (Exception e) {
-                e.printStackTrace();
+                LOGGER.log(Level.SEVERE, "Exception sending heartbeat", e);
             }
         }
     }
@@ -495,7 +495,7 @@
         @Override
         public void run() {
             try {
-                FutureValue<List<JobProfile>> fv = new FutureValue<List<JobProfile>>();
+                FutureValue<List<JobProfile>> fv = new FutureValue<>();
                 BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this,
fv);
                 queue.scheduleAndSync(bjpw);
                 List<JobProfile> profiles = fv.get();
@@ -503,7 +503,7 @@
                     cc.reportProfile(id, profiles);
                 }
             } catch (Exception e) {
-                e.printStackTrace();
+                LOGGER.log(Level.WARNING, "Exception reporting profile", e);
             }
         }
     }
@@ -580,8 +580,10 @@
                     final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction)
fn;
                     queue.schedule(new NodeThreadDumpWork(NodeControllerService.this, tdrf.getRequestId()));
                     return;
+
+                default:
+                    throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
             }
-            throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
 
         }
     }
@@ -611,15 +613,11 @@
 
         @Override
         public void run() {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Shutdown hook in progress");
-            }
+            LOGGER.info("Shutdown hook in progress");
             try {
                 nodeControllerService.stop();
             } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Exception in executing shutdown hook" + e);
-                }
+                LOGGER.warning("Exception in executing shutdown hook" + e);
             }
         }
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1217
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I12a9577c570c095afeac882664d29f0c8f53a4ad
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <mblow@apache.org>

Mime
View raw message