asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mb...@apache.org
Subject asterixdb git commit: Re-register NC with CC on reestablished IPCHandle
Date Wed, 14 Jun 2017 15:39:41 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 16b57d8e3 -> 2534d27c1


Re-register NC with CC on reestablished IPCHandle

In case of failed NC -> CC IPCHandle due to CC crash/restart, the NC
needs to re-register with the CC in order to rejoin the cluster, as the
CC ignore heartbeats from unregistered nodes.

- Improve toString on IPCHandle
- Add tests for killing & restarting CC / NCs to NCServiceExecutionIT
- Retrigger NCService on detected dead node
- Ensure jobIds are not reused on CC restart
- NCService shouldn't truncate NC log

Change-Id: I6f93ca9ab37e56e02bafcdecd1e2d0cf664faef6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1830
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/2534d27c
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/2534d27c
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/2534d27c

Branch: refs/heads/master
Commit: 2534d27c10ed50a8dc954007a9afa7ce98c70eba
Parents: 16b57d8
Author: Michael Blow <mblow@apache.org>
Authored: Wed Jun 14 09:08:22 2017 -0400
Committer: Michael Blow <mblow@apache.org>
Committed: Wed Jun 14 08:39:21 2017 -0700

----------------------------------------------------------------------
 .../asterix/app/nc/NCAppRuntimeContext.java     |   9 +-
 .../hyracks/bootstrap/NCApplication.java        |  12 ++-
 .../asterix/test/common/TestExecutor.java       |  43 ++++++++
 .../server/test/NCServiceExecutionIT.java       | 104 +++++++++++++++----
 .../server/test/SampleLocalClusterIT.java       |  44 ++++++--
 .../test/resources/NCServiceExecutionIT/cc.conf |   4 +
 .../hyracks/api/application/INCApplication.java |   1 +
 .../apache/hyracks/api/job/JobIdFactory.java    |  10 +-
 .../hyracks/control/cc/ClientInterfaceIPCI.java |   4 +-
 .../control/cc/ClusterControllerService.java    |  48 +++++++--
 .../control/cc/work/RegisterNodeWork.java       |   1 +
 .../common/controllers/NodeRegistration.java    |  10 +-
 .../ipc/ClusterControllerRemoteProxy.java       |   5 +-
 .../common/ipc/ControllerRemoteProxy.java       |  17 ++-
 .../IControllerRemoteProxyIPCEventListener.java |  37 +++++++
 .../hyracks/control/nc/BaseNCApplication.java   |   5 +
 .../control/nc/NodeControllerService.java       |  81 ++++++++++-----
 .../control/nc/work/DistributeJobWork.java      |   1 +
 .../hyracks/control/nc/work/StartTasksWork.java |   1 +
 .../hyracks/control/nc/service/NCService.java   |  14 ++-
 .../btree/helper/TestNCApplication.java         |   5 +
 .../org/apache/hyracks/ipc/impl/IPCHandle.java  |   5 +
 .../server/process/HyracksServerProcess.java    |  21 +++-
 .../server/process/HyracksVirtualCluster.java   |   6 +-
 24 files changed, 409 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 6ad85b3..c23da6c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -120,6 +120,7 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     private IFileMapManager fileMapManager;
     private IBufferCache bufferCache;
     private ITransactionSubsystem txnSubsystem;
+    private IMetadataNode metadataNodeStub;
 
     private ILSMIOOperationScheduler lsmIOScheduler;
     private PersistentLocalResourceRepository localResourceRepository;
@@ -464,9 +465,11 @@ public class NCAppRuntimeContext implements INcApplicationContext {
 
     @Override
     public void exportMetadataNodeStub() throws RemoteException {
-        IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
-                getMetadataProperties().getMetadataPort());
-        ((IAsterixStateProxy) getServiceContext().getDistributedState()).setMetadataNode(stub);
+        if (metadataNodeStub == null) {
+            metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
+                    getMetadataProperties().getMetadataPort());
+        }
+        ((IAsterixStateProxy) getServiceContext().getDistributedState()).setMetadataNode(metadataNodeStub);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 107b859..7c8e153 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -67,7 +67,8 @@ public class NCApplication extends BaseNCApplication {
     protected INCServiceContext ncServiceCtx;
     private INcApplicationContext runtimeContext;
     private String nodeId;
-    private boolean stopInitiated = false;
+    private boolean stopInitiated;
+    private boolean startupCompleted;
     private SystemState systemState;
     protected WebManager webManager;
 
@@ -190,6 +191,15 @@ public class NCApplication extends BaseNCApplication {
         }
         // Request startup tasks from CC
         StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
+        startupCompleted = true;
+    }
+
+    @Override
+    public void onRegisterNode() throws Exception {
+        if (startupCompleted) {
+            // Request startup tasks from CC
+            StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 25b9f0a..edbcc7e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -72,6 +72,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.client.methods.RequestBuilder;
 import org.apache.http.entity.ContentType;
@@ -1399,4 +1400,46 @@ public class TestExecutor {
         return servlet;
     }
 
+    public void waitForClusterActive(int timeoutSecs, TimeUnit timeUnit) throws Exception {
+        waitForClusterState("ACTIVE", timeoutSecs, timeUnit);
+    }
+
+    public void waitForClusterState(String desiredState, int timeout, TimeUnit timeUnit) throws Exception {
+        LOGGER.info("Waiting for cluster state " + desiredState + "...");
+        Thread t = new Thread(() -> {
+            while (true) {
+                try {
+                    final HttpClient client = HttpClients.createDefault();
+
+                    final HttpGet get = new HttpGet(createEndpointURI("/admin/cluster", null));
+                    final HttpResponse httpResponse = client.execute(get);
+                    final int statusCode = httpResponse.getStatusLine().getStatusCode();
+                    final String response = EntityUtils.toString(httpResponse.getEntity());
+                    if (statusCode != HttpStatus.SC_OK) {
+                        throw new Exception("HTTP error " + statusCode + ":\n" + response);
+                    }
+                    ObjectMapper om = new ObjectMapper();
+                    ObjectNode result = (ObjectNode) om.readTree(response);
+                    if (desiredState.equals(result.get("state").asText())) {
+                        break;
+                    }
+                } catch (Exception e) {
+                    // ignore, try again
+                }
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        });
+        t.start();
+        timeUnit.timedJoin(t, timeout);
+        if (t.isAlive()) {
+            throw new Exception("Cluster did not become " + desiredState + " within " + timeout + " "
+                    + timeUnit.name().toLowerCase());
+        }
+        LOGGER.info("Cluster state now " + desiredState);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
index 6a619d3..9e51f8c 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
@@ -22,6 +22,8 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import org.apache.asterix.test.common.TestExecutor;
@@ -30,8 +32,11 @@ import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.asterix.testframework.xml.TestGroup;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.server.process.HyracksCCProcess;
+import org.apache.hyracks.server.process.HyracksNCServiceProcess;
 import org.apache.hyracks.server.process.HyracksVirtualCluster;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
@@ -92,11 +97,27 @@ public class NCServiceExecutionIT {
 
     private static final Logger LOGGER = Logger.getLogger(NCServiceExecutionIT.class.getName());
 
+    enum KillCommand {
+        CC,
+        NC1,
+        NC2;
+
+        @Override
+        public String toString() {
+            return "<kill " + name().toLowerCase() + ">";
+        }
+    }
+
+    private static HyracksCCProcess cc;
+    private static HyracksNCServiceProcess nc1;
+    private static HyracksNCServiceProcess nc2;
+
     private final TestCaseContext tcCtx;
     private static final TestExecutor testExecutor = new TestExecutor();
 
     private static final List<String> badTestCases = new ArrayList<>();
     private static HyracksVirtualCluster cluster;
+    private final KillCommand killType;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -114,28 +135,20 @@ public class NCServiceExecutionIT {
         HDFSCluster.getInstance().setup(ASTERIX_APP_DIR + File.separator);
 
         cluster = new HyracksVirtualCluster(new File(APP_HOME), new File(ASTERIX_APP_DIR));
-        cluster.addNCService(
+        nc1 = cluster.addNCService(
                 new File(CONF_DIR, "ncservice1.conf"),
                 new File(LOG_DIR, "ncservice1.log"));
-        cluster.addNCService(
+
+        nc2 = cluster.addNCService(
                 new File(CONF_DIR, "ncservice2.conf"),
                 new File(LOG_DIR, "ncservice2.log"));
 
-        try {
-            Thread.sleep(2000);
-        } catch (InterruptedException ignored) {
-        }
-
         // Start CC
-        cluster.start(
+        cc = cluster.start(
                 new File(CONF_DIR, "cc.conf"),
                 new File(LOG_DIR, "cc.log"));
 
-        LOGGER.info("Sleeping while cluster comes online...");
-        try {
-            Thread.sleep(6000);
-        } catch (InterruptedException ignored) {
-        }
+        testExecutor.waitForClusterActive(30, TimeUnit.SECONDS);
     }
 
     @AfterClass
@@ -157,16 +170,40 @@ public class NCServiceExecutionIT {
 
     @Parameters(name = "NCServiceExecutionTest {index}: {0}")
     public static Collection<Object[]> tests() throws Exception {
-        Collection<Object[]> testArgs = new ArrayList<Object[]>();
+        Collection<Object[]> testArgs = new ArrayList<>();
+        Random random = getRandom();
         TestCaseContext.Builder b = new TestCaseContext.Builder();
         for (TestCaseContext ctx : b.build(new File(TESTS_DIR))) {
             if (!skip(ctx)) {
-                testArgs.add(new Object[] { ctx });
+                testArgs.add(new Object[] { ctx, ctx, null });
+            }
+            // let's kill something every 50 tests
+            if (testArgs.size() % 50 == 0) {
+                final KillCommand killCommand = KillCommand.values()[random.nextInt(KillCommand.values().length)];
+                testArgs.add(new Object[] { killCommand, null, killCommand});
             }
         }
         return testArgs;
     }
 
+    private static Random getRandom() {
+        Random random;
+        if (System.getProperty("random.seed") == null) {
+            random = new Random() {
+                @Override
+                public synchronized void setSeed(long seed) {
+                    super.setSeed(seed);
+                    System.err.println("using generated seed: " + seed + "; use -Drandom.seed to use specific seed");
+                }
+            };
+        } else {
+            final long seed = Long.getLong("random.seed");
+            System.err.println("using provided seed (-Drandom.seed): " + seed);
+            random = new Random(seed);
+        }
+        return random;
+    }
+
     private static boolean skip(TestCaseContext tcCtx) {
         // For now we skip feeds tests, external-library, and api tests.
         for (TestGroup group : tcCtx.getTestGroups()) {
@@ -180,13 +217,42 @@ public class NCServiceExecutionIT {
         return false;
     }
 
-    public NCServiceExecutionIT(TestCaseContext ctx) {
-        this.tcCtx = ctx;
+    public NCServiceExecutionIT(Object description, TestCaseContext tcCtx, KillCommand killType) {
+        this.tcCtx = tcCtx;
+        this.killType = killType;
     }
 
     @Test
     public void test() throws Exception {
-        testExecutor.executeTest(ACTUAL_RESULTS_DIR, tcCtx, null, false);
-        testExecutor.cleanup(tcCtx.toString(), badTestCases);
+        if (tcCtx != null) {
+            testExecutor.executeTest(ACTUAL_RESULTS_DIR, tcCtx, null, false);
+            testExecutor.cleanup(tcCtx.toString(), badTestCases);
+        } else {
+            switch (killType) {
+                case CC:
+                    LOGGER.info("Killing CC...");
+                    cc.stop(true);
+                    cc.start();
+                    break;
+
+                case NC1:
+                    LOGGER.info("Killing NC1...");
+                    nc1.stop(); // we can't kill due to ASTERIXDB-1941
+                    testExecutor.waitForClusterState("UNUSABLE", 60, TimeUnit.SECONDS); // wait for missed heartbeats...
+                    nc1.start(); // this restarts the NC service
+                    break;
+
+                case NC2:
+                    LOGGER.info("Killing NC2...");
+                    nc2.stop(); // we can't kill due to ASTERIXDB-1941
+                    testExecutor.waitForClusterState("UNUSABLE", 60, TimeUnit.SECONDS); // wait for missed heartbeats...
+                    nc2.start(); // this restarts the NC service
+                    break;
+
+                default:
+                    Assert.fail("killType: " + killType);
+            }
+            testExecutor.waitForClusterActive(30, TimeUnit.SECONDS);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
index 496d4cf..0beb38d 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
@@ -18,13 +18,17 @@
  */
 package org.apache.asterix.server.test;
 
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import org.apache.asterix.common.utils.Servlets;
 import org.apache.asterix.test.base.TestMethodTracer;
@@ -33,7 +37,7 @@ import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.hyracks.util.file.FileUtil;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.FixMethodOrder;
@@ -48,13 +52,13 @@ public class SampleLocalClusterIT {
     // Important paths and files for this test.
 
     // The "target" subdirectory of asterix-server. All outputs go here.
-    private static final String TARGET_DIR = FileUtil.joinPath("target");
+    private static final String TARGET_DIR = joinPath("target");
 
     // Directory where the NCs create and store all data, as configured by
     // src/test/resources/NCServiceExecutionIT/cc.conf.
-    private static final String OUTPUT_DIR = FileUtil.joinPath(TARGET_DIR, "sample local cluster");
+    private static final String OUTPUT_DIR = joinPath(TARGET_DIR, "sample local cluster");
 
-    private static final String LOCAL_SAMPLES_DIR = FileUtil.joinPath(OUTPUT_DIR, "opt", "local");
+    private static final String LOCAL_SAMPLES_DIR = joinPath(OUTPUT_DIR, "opt", "local");
 
     @Rule
     public TestRule watcher = new TestMethodTracer();
@@ -72,17 +76,41 @@ public class SampleLocalClusterIT {
 
         String[] pathElements = new String[] { TARGET_DIR,
                 new File(TARGET_DIR).list((dir, name) -> name.matches("asterix-server.*-binary-assembly.zip"))[0] };
-        String installerZip = FileUtil.joinPath(pathElements);
+        String installerZip = joinPath(pathElements);
 
         TestHelper.unzip(installerZip, OUTPUT_DIR);
     }
 
+    private static List<File> findLogFiles(File directory, List<File> fileList) {
+        File [] match = directory.listFiles(pathname -> pathname.isDirectory() || pathname.toString().endsWith(".log"));
+        if (match != null) {
+            for (File file : match) {
+                if (file.isDirectory()) {
+                    findLogFiles(file, fileList);
+                } else {
+                    fileList.add(file);
+                }
+            }
+        }
+        return fileList;
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+
+        File destDir = new File(TARGET_DIR, joinPath("failsafe-reports", SampleLocalClusterIT.class.getSimpleName()));
+
+        for (File f : findLogFiles(new File(OUTPUT_DIR), new ArrayList<>())) {
+            FileUtils.copyFileToDirectory(f, destDir);
+        }
+    }
+
     @Test
     public void test0_startCluster() throws Exception {
-        Process process = new ProcessBuilder(FileUtil.joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh"), "-f")
+        Process process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh"), "-f")
                 .inheritIO().start();
         Assert.assertEquals(0, process.waitFor());
-        process = new ProcessBuilder(FileUtil.joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh")).inheritIO().start();
+        process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh")).inheritIO().start();
         Assert.assertEquals(0, process.waitFor());
     }
 
@@ -100,7 +128,7 @@ public class SampleLocalClusterIT {
     @Test
     public void test2_stopCluster() throws Exception {
         Process process =
-                new ProcessBuilder(FileUtil.joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh")).inheritIO().start();
+                new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh")).inheritIO().start();
         Assert.assertEquals(0, process.waitFor());
         try {
             new URL("http://127.0.0.1:19002").openConnection().connect();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
index 72d28b9..2a1c652 100644
--- a/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
+++ b/asterixdb/asterix-server/src/test/resources/NCServiceExecutionIT/cc.conf
@@ -39,3 +39,7 @@ storage.memorycomponent.globalbudget = 1073741824
 [cc]
 address = 127.0.0.1
 app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+
+[common]
+log.level = INFO
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index 02416c4..64f4e29 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -33,4 +33,5 @@ public interface INCApplication extends IApplication {
      */
     IFileDeviceResolver getFileDeviceResolver();
 
+    void onRegisterNode() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
index bd69f9e..dd63786 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
@@ -18,10 +18,16 @@
  */
 package org.apache.hyracks.api.job;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 public class JobIdFactory {
-    private long id = 0;
+    private final AtomicLong id = new AtomicLong(0);
 
     public JobId create() {
-        return new JobId(id++);
+        return new JobId(id.getAndIncrement());
+    }
+
+    public void ensureMinimumId(long id) {
+        this.id.updateAndGet(current -> Math.max(current, id));
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index ced3d67..b3e65ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -54,9 +54,9 @@ class ClientInterfaceIPCI implements IIPCI {
     private final ClusterControllerService ccs;
     private final JobIdFactory jobIdFactory;
 
-    ClientInterfaceIPCI(ClusterControllerService ccs) {
+    ClientInterfaceIPCI(ClusterControllerService ccs, JobIdFactory jobIdFactory) {
         this.ccs = ccs;
-        jobIdFactory = new JobIdFactory();
+        this.jobIdFactory = jobIdFactory;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index bec0bfb..627f406 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -40,13 +41,16 @@ import java.util.logging.Logger;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.application.ICCApplication;
+import org.apache.hyracks.api.application.IClusterLifecycleListener;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
@@ -126,12 +130,14 @@ public class ClusterControllerService implements IControllerService {
 
     private final IResourceManager resourceManager = new ResourceManager();
 
+    private final ICCApplication application;
+
+    private final JobIdFactory jobIdFactory;
+
     private IJobManager jobManager;
 
     private ShutdownRun shutdownCallback;
 
-    private final ICCApplication application;
-
     public ClusterControllerService(final CCConfig config) throws Exception {
         this(config, getApplication(config));
     }
@@ -162,6 +168,8 @@ public class ClusterControllerService implements IControllerService {
 
         // Node manager is in charge of cluster membership management.
         nodeManager = new NodeManager(ccConfig, resourceManager);
+
+        jobIdFactory = new JobIdFactory();
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -184,7 +192,7 @@ public class ClusterControllerService implements IControllerService {
         IIPCI ccIPCI = new ClusterControllerIPCI(this);
         clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
-        IIPCI ciIPCI = new ClientInterfaceIPCI(this);
+        IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory);
         clientIPC = new IPCSystem(
                 new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
                 ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
@@ -228,13 +236,18 @@ public class ClusterControllerService implements IControllerService {
         }
     }
 
+    private Pair<String, Integer> getNCService(String nodeId) {
+        IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(nodeId);
+        return Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
+                ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT));
+    }
+
     private Map<String, Pair<String, Integer>> getNCServices() {
         Map<String, Pair<String, Integer>> ncMap = new TreeMap<>();
         for (String ncId : configManager.getNodeNames()) {
-            IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(ncId);
-            if (ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT) != NCConfig.NCSERVICE_PORT_DISABLED) {
-                ncMap.put(ncId, Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
-                        ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT)));
+            Pair<String, Integer> ncService = getNCService(ncId);
+            if (ncService.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) {
+                ncMap.put(ncId, ncService);
             }
         }
         return ncMap;
@@ -246,6 +259,23 @@ public class ClusterControllerService implements IControllerService {
                     ncService.getValue().getLeft(), ncService.getValue().getRight(), ncService.getKey());
             workQueue.schedule(triggerWork);
         });
+        serviceCtx.addClusterLifecycleListener(new IClusterLifecycleListener() {
+            @Override
+            public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
+                // no-op, we don't care
+            }
+
+            @Override
+            public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
+                for (String nodeId : deadNodeIds) {
+                    Pair<String, Integer> ncService = getNCService(nodeId);
+
+                    final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this,
+                            ncService.getLeft(), ncService.getRight(), nodeId);
+                    workQueue.schedule(triggerWork);
+                }
+            }
+        });
     }
 
     private void terminateNCServices() throws Exception {
@@ -353,6 +383,10 @@ public class ClusterControllerService implements IControllerService {
         return new NetworkAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
     }
 
+    public JobIdFactory getJobIdFactory() {
+        return jobIdFactory;
+    }
+
     private final class ClusterControllerContext implements ICCContext {
         private final ClusterTopology topology;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index dc7bad0..f44fca0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -69,6 +69,7 @@ public class RegisterNodeWork extends SynchronizableWork {
             params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriod());
             params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
+            ccs.getJobIdFactory().ensureMinimumId(reg.getMaxJobId() + 1);
         } catch (Exception e) {
             result = new CCNCFunctions.NodeRegistrationResult(null, e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 490b6ff..89d6e78 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -72,11 +72,14 @@ public final class NodeRegistration implements Serializable {
 
     private final NodeCapacity capacity;
 
+    private final long maxJobId;
+
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
                             NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors,
                             String vmName, String vmVersion, String vmVendor, String classpath, String libraryPath,
                             String bootClasspath, List<String> inputArguments, Map<String, String> systemProperties,
-            HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity, int pid) {
+                            HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity, int pid,
+                            long maxJobId) {
         this.ncAddress = ncAddress;
         this.nodeId = nodeId;
         this.ncConfig = ncConfig;
@@ -98,6 +101,7 @@ public final class NodeRegistration implements Serializable {
         this.messagingPort = messagingPort;
         this.capacity = capacity;
         this.pid = pid;
+        this.maxJobId = maxJobId;
     }
 
     public InetSocketAddress getNodeControllerAddress() {
@@ -181,4 +185,8 @@ public final class NodeRegistration implements Serializable {
     }
 
     public int getPid() { return pid; }
+
+    public long getMaxJobId() {
+        return maxJobId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 98d258f..31cb855 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -44,8 +44,9 @@ public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implemen
 
     private final int clusterConnectRetries;
 
-    public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, int clusterConnectRetries) {
-        super(ipc, inetSocketAddress);
+    public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, int clusterConnectRetries,
+                                        IControllerRemoteProxyIPCEventListener eventListener) {
+        super(ipc, inetSocketAddress, eventListener);
         this.clusterConnectRetries = clusterConnectRetries;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
index 44b0e4a..2815ae1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -28,11 +28,18 @@ import org.apache.hyracks.ipc.impl.IPCSystem;
 public abstract class ControllerRemoteProxy {
     protected final IPCSystem ipc;
     protected final InetSocketAddress inetSocketAddress;
+    private final IControllerRemoteProxyIPCEventListener eventListener;
     private IIPCHandle ipcHandle;
 
     protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) {
+        this(ipc, inetSocketAddress, null);
+    }
+
+    protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress,
+                                    IControllerRemoteProxyIPCEventListener eventListener) {
         this.ipc = ipc;
         this.inetSocketAddress = inetSocketAddress;
+        this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {} : eventListener;
     }
 
     protected IIPCHandle ensureIpcHandle() throws IPCException {
@@ -40,10 +47,16 @@ public abstract class ControllerRemoteProxy {
         if (first || !ipcHandle.isConnected()) {
             if (!first) {
                 getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
+                eventListener.ipcHandleDisconnected(ipcHandle);
             }
             ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
-            if (!first && ipcHandle.isConnected()) {
-                getLogger().warning("ipcHandle " + ipcHandle + " restored");
+            if (ipcHandle.isConnected()) {
+                if (first) {
+                    eventListener.ipcHandleConnected(ipcHandle);
+                } else {
+                    getLogger().warning("ipcHandle " + ipcHandle + " restored");
+                    eventListener.ipcHandleRestored(ipcHandle);
+                }
             }
         }
         return ipcHandle;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
new file mode 100644
index 0000000..ec4f9e4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.ipc;
+
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+
+public interface IControllerRemoteProxyIPCEventListener {
+
+    default void ipcHandleConnected(IIPCHandle handle) throws IPCException {
+        // no-op
+    }
+
+    default void ipcHandleDisconnected(IIPCHandle handle) throws IPCException {
+        // no-op
+    }
+
+    default void ipcHandleRestored(IIPCHandle handle) throws IPCException {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 20f6378..5afc98d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -53,6 +53,11 @@ public class BaseNCApplication implements INCApplication {
     }
 
     @Override
+    public void onRegisterNode() throws Exception {
+        // no-op
+    }
+
+    @Override
     public void stop() throws Exception {
         // no-op
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0587a55..fc911c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -37,6 +37,7 @@ import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -65,6 +66,7 @@ import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
+import org.apache.hyracks.control.common.ipc.IControllerRemoteProxyIPCEventListener;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.utils.PidHelper;
 import org.apache.hyracks.control.common.work.FutureValue;
@@ -80,7 +82,9 @@ import org.apache.hyracks.control.nc.net.NetworkManager;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
+import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
@@ -157,6 +161,10 @@ public class NodeControllerService implements IControllerService {
 
     private final ConfigManager configManager;
 
+    private NodeRegistration nodeRegistration;
+
+    private final AtomicLong maxJobId = new AtomicLong(-1);
+
     public NodeControllerService(NCConfig config) throws Exception {
         this(config, getApplication(config));
     }
@@ -189,7 +197,6 @@ public class NodeControllerService implements IControllerService {
         threadMXBean = ManagementFactory.getThreadMXBean();
         runtimeMXBean = ManagementFactory.getRuntimeMXBean();
         osMXBean = ManagementFactory.getOperatingSystemMXBean();
-        registrationPending = true;
         getNodeControllerInfosAcceptor = new MutableObject<>();
         memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
         ioCounter = new IOCounterFactory().getIOCounter();
@@ -280,7 +287,43 @@ public class NodeControllerService implements IControllerService {
         }
         this.ccs = new ClusterControllerRemoteProxy(ipc,
                 new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
-                ncConfig.getClusterConnectRetries());
+                ncConfig.getClusterConnectRetries(), new IControllerRemoteProxyIPCEventListener() {
+            @Override
+            public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
+                // we need to re-register in case the NC -> CC connection reset was due to CC shutdown
+                try {
+                    registerNode();
+                } catch (Exception e) {
+                    throw new IPCException(e);
+                }
+            }
+        });
+        registerNode();
+
+        workQueue.start();
+
+        heartbeatTask = new HeartbeatTask(ccs);
+
+        // Use reflection to set the priority of the timer thread.
+        Field threadField = timer.getClass().getDeclaredField("thread");
+        threadField.setAccessible(true);
+        Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object.
+        timerThread.setPriority(Thread.MAX_PRIORITY);
+        // Schedule heartbeat generator.
+        timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
+
+        if (nodeParameters.getProfileDumpPeriod() > 0) {
+            // Schedule profile dump generator.
+            timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
+        }
+
+        LOGGER.log(Level.INFO, "Started NodeControllerService");
+        application.startupCompleted();
+    }
+
+    public void registerNode() throws Exception {
+        LOGGER.info("Registering with Cluster Controller");
+        registrationPending = true;
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
             gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
@@ -292,12 +335,14 @@ public class NodeControllerService implements IControllerService {
         NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
                 : null;
         int allCores = osMXBean.getAvailableProcessors();
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+        nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
                 osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores,
                 runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
                 runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
                 runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
-                application.getCapacity(), PidHelper.getPid()));
+                application.getCapacity(), PidHelper.getPid(), maxJobId.get());
+
+        ccs.registerNode(nodeRegistration);
 
         synchronized (this) {
             while (registrationPending) {
@@ -305,29 +350,13 @@ public class NodeControllerService implements IControllerService {
             }
         }
         if (registrationException != null) {
+            LOGGER.log(Level.WARNING, "Registering with Cluster Controller failed with exception",
+                    registrationException);
             throw registrationException;
         }
         serviceCtx.setDistributedState(nodeParameters.getDistributedState());
-
-        workQueue.start();
-
-        heartbeatTask = new HeartbeatTask(ccs);
-
-        // Use reflection to set the priority of the timer thread.
-        Field threadField = timer.getClass().getDeclaredField("thread");
-        threadField.setAccessible(true);
-        Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object.
-        timerThread.setPriority(Thread.MAX_PRIORITY);
-        // Schedule heartbeat generator.
-        timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
-
-        if (nodeParameters.getProfileDumpPeriod() > 0) {
-            // Schedule profile dump generator.
-            timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
-        }
-
-        LOGGER.log(Level.INFO, "Started NodeControllerService");
-        application.startupCompleted();
+        application.onRegisterNode();
+        LOGGER.info("Registering with Cluster Controller complete");
     }
 
     private void startApplication() throws Exception {
@@ -336,6 +365,10 @@ public class NodeControllerService implements IControllerService {
         executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
     }
 
+    public void updateMaxJobId(JobId jobId) {
+        maxJobId.getAndUpdate(currentMaxId -> Math.max(currentMaxId, jobId.getId()));
+    }
+
     @Override
     public synchronized void stop() throws Exception {
         if (!shuttedDown) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
index 807142b..486a420 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DistributeJobWork.java
@@ -46,6 +46,7 @@ public class DistributeJobWork extends AbstractWork {
     public void run() {
         try {
             ncs.checkForDuplicateDistributedJob(jobId);
+            ncs.updateMaxJobId(jobId);
             ActivityClusterGraph acg =
                     (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, null, ncs.getContext());
             ncs.storeActivityClusterGraph(jobId, acg);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 95f3e83..b55cd4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -103,6 +103,7 @@ public class StartTasksWork extends AbstractWork {
     public void run() {
         Task task = null;
         try {
+            ncs.updateMaxJobId(jobId);
             NCServiceContext serviceCtx = ncs.getContext();
             Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, serviceCtx, acgBytes);
             final ActivityClusterGraph acg = joblet.getActivityClusterGraph();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
index 00af38b..9da3502 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.nc.service;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -30,6 +31,7 @@ import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
@@ -170,8 +172,11 @@ public class NCService {
                     // If the directory IS there, all is well
                 }
                 File logfile = new File(config.logdir, "nc-" + ncId + ".log");
-                // Don't care if this succeeds or fails:
-                logfile.delete();
+                try (FileWriter writer = new FileWriter(logfile, true)) {
+                    writer.write("---------------------\n");
+                    writer.write(new Date() + "\n");
+                    writer.write("---------------------\n");
+                }
                 pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logfile));
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Logging to " + logfile.getCanonicalPath());
@@ -254,6 +259,11 @@ public class NCService {
             public void run() {
                 if (proc != null) {
                     proc.destroy();
+                    try {
+                        proc.waitFor();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index b7aa342..2967039 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -40,6 +40,11 @@ public class TestNCApplication implements INCApplication {
     }
 
     @Override
+    public void onRegisterNode() throws Exception {
+        // No-op
+    }
+
+    @Override
     public void stop() throws Exception {
         // No-op
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 1fcdb3c..efd9830 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -192,4 +192,9 @@ final class IPCHandle implements IIPCHandle {
     boolean full() {
         return full;
     }
+
+    @Override
+    public String toString() {
+        return "IPCHandle [addr=" + remoteAddress + " state=" + state + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksServerProcess.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksServerProcess.java b/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksServerProcess.java
index 13cb445..7bb3332 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksServerProcess.java
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksServerProcess.java
@@ -19,10 +19,8 @@
 package org.apache.hyracks.server.process;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -51,7 +49,9 @@ abstract class HyracksServerProcess {
                 LOGGER.info("Logging to: " + logFile.getCanonicalPath());
             }
             logFile.getParentFile().mkdirs();
-            logFile.delete();
+            try (FileWriter writer = new FileWriter(logFile, true)) {
+                writer.write("---------------------\n");
+            }
             pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile));
         } else {
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -71,6 +71,19 @@ abstract class HyracksServerProcess {
         }
     }
 
+    public void stop(boolean forcibly) {
+        if (forcibly) {
+            process.destroyForcibly();
+        } else {
+            process.destroy();
+        }
+        try {
+            process.waitFor();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     private String[] buildCommand() {
         List<String> cList = new ArrayList<String>();
         cList.add(getJavaCommand());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2534d27c/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksVirtualCluster.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksVirtualCluster.java b/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksVirtualCluster.java
index cc0ecf7..e1590e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksVirtualCluster.java
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksVirtualCluster.java
@@ -51,10 +51,11 @@ public class HyracksVirtualCluster {
      * @param configFile - full path to an ncservice.conf. May be null to accept all defaults.
      * @throws IOException - if there are errors starting the process.
      */
-    public void addNCService(File configFile, File logFile) throws IOException {
+    public HyracksNCServiceProcess addNCService(File configFile, File logFile) throws IOException {
         HyracksNCServiceProcess proc = new HyracksNCServiceProcess(configFile, logFile, appHome, workingDir);
         proc.start();
         ncProcs.add(proc);
+        return proc;
     }
 
     /**
@@ -64,9 +65,10 @@ public class HyracksVirtualCluster {
      *                     defaults, although this is seldom useful since there are no NCs.
      * @throws IOException - if there are errors starting the process.
      */
-    public void start(File ccConfigFile, File logFile) throws IOException {
+    public HyracksCCProcess start(File ccConfigFile, File logFile) throws IOException {
         ccProc = new HyracksCCProcess(ccConfigFile, logFile, appHome, workingDir);
         ccProc.start();
+        return ccProc;
     }
 
     /**


Mime
View raw message