asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Blow (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: checkpoint
Date Sat, 03 Feb 2018 23:16:41 GMT
Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2352

Change subject: checkpoint
......................................................................

checkpoint

Change-Id: Ibcbdfb25d99a11b01ed390ce266f332fb40f17a4
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
M asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
8 files changed, 127 insertions(+), 86 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/52/2352/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 366438a..513cae7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -77,6 +77,7 @@
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -472,7 +473,9 @@
                 if (hcc == null || !hcc.isConnected()) {
                     try {
                         NodeControllerService ncSrv = (NodeControllerService) ncServiceContext.getControllerService();
-                        ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo();
+                        // TODO(mblow): multicc
+                        CcId primaryCcId = ncSrv.getPrimaryCcId();
+                        ClusterControllerInfo ccInfo = ncSrv.getNodeParameters(primaryCcId).getClusterControllerInfo();
                         hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
                     } catch (Exception e) {
                         throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 1cefd42..89ff077 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -152,7 +152,7 @@
         appCtx.setExtensionManager(ccExtensionManager);
         final CCConfig ccConfig = controllerService.getCCConfig();
         if (System.getProperty("java.rmi.server.hostname") == null) {
-            System.setProperty("java.rmi.server.hostname", ccConfig.getClusterListenAddress());
+            System.setProperty("java.rmi.server.hostname", ccConfig.getClusterPublicAddress());
         }
         MetadataProperties metadataProperties = appCtx.getMetadataProperties();
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index 8ab9f82..5357fc8 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -58,6 +58,7 @@
     @Override
     public IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) throws InterruptedException
{
         synchronized (this) {
+            //TODO(mblow): replace with nanoTime() to avoid being affected by system clock
adjustments...
             long timeToWait = TimeUnit.MILLISECONDS.convert(waitFor, timeUnit);
             while (metadataNode == null && timeToWait > 0) {
                 long startTime = System.currentTimeMillis();
diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
index 406a762..201945c 100644
--- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
+++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
@@ -22,27 +22,28 @@
 
 import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
+import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.parsers.SAXParser;
 import javax.xml.parsers.SAXParserFactory;
 import javax.xml.transform.sax.SAXSource;
 
 import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
 
 public class TestSuiteParser {
-    public TestSuiteParser() {
-    }
 
-    public org.apache.asterix.testframework.xml.TestSuite parse(File testSuiteCatalog) throws
Exception {
+    public TestSuite parse(File testSuiteCatalog) throws SAXException, JAXBException, ParserConfigurationException
{
         SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
         saxParserFactory.setNamespaceAware(true);
         saxParserFactory.setXIncludeAware(true);
         SAXParser saxParser = saxParserFactory.newSAXParser();
         saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file");
 
-        JAXBContext ctx = JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class);
+        JAXBContext ctx = JAXBContext.newInstance(TestSuite.class);
         Unmarshaller um = ctx.createUnmarshaller();
-        return (org.apache.asterix.testframework.xml.TestSuite) um.unmarshal(
+        return (TestSuite) um.unmarshal(
                 new SAXSource(saxParser.getXMLReader(), new InputSource(testSuiteCatalog.toURI().toString())));
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
index 3d69ddb..0e04dca 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hyracks.api.client;
 
+import org.apache.hyracks.api.control.CcId;
+
 import java.io.Serializable;
 
 public class ClusterControllerInfo implements Serializable {
     private static final long serialVersionUID = 1L;
+
+    private final CcId ccId;
 
     private final String clientNetAddress;
 
@@ -29,12 +33,17 @@
 
     private final int webPort;
 
-    public ClusterControllerInfo(String clientNetAddress, int clientNetPort, int webPort)
{
+    public ClusterControllerInfo(CcId ccId, String clientNetAddress, int clientNetPort, int
webPort) {
+        this.ccId = ccId;
         this.clientNetAddress = clientNetAddress;
         this.clientNetPort = clientNetPort;
         this.webPort = webPort;
     }
 
+    public CcId getCcId() {
+        return ccId;
+    }
+
     public int getWebPort() {
         return webPort;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index a6edd70..f8fe77f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -215,7 +215,7 @@
         clusterIPC.start();
         clientIPC.start();
         webServer.start();
-        info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
+        info = new ClusterControllerInfo(ccId, ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
                 webServer.getListeningPort());
         timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriodMillis());
         jobLog.open();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0e74a4c..277cdf5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
@@ -108,7 +109,7 @@
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
     private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
 
-    private NCConfig ncConfig;
+    private final NCConfig ncConfig;
 
     private final String id;
 
@@ -128,13 +129,9 @@
 
     private final Timer timer;
 
-    private boolean registrationPending;
+    private CcId primaryCcId;
 
-    private Exception registrationException;
-
-    private IClusterController primaryCcs;
-
-    private final Map<CcId, IClusterController> ccsMap = Collections.synchronizedMap(new
HashMap<>());
+    private final Map<CcId, CcConnection> ccMap = Collections.synchronizedMap(new HashMap<>());
 
     private final Map<JobId, Joblet> jobletMap;
 
@@ -144,11 +141,9 @@
 
     private ExecutorService executor;
 
-    private NodeParameters nodeParameters;
+    private Map<CcId, Thread> heartbeatThreads = new ConcurrentHashMap<>();
 
-    private Map<IClusterController, Thread> heartbeatThreads = new ConcurrentHashMap<>();
-
-    private Map<IClusterController, Timer> ccTimers = new ConcurrentHashMap<>();
+    private Map<CcId, Timer> ccTimers = new ConcurrentHashMap<>();
 
     private final ServerContext serverCtx;
 
@@ -179,8 +174,6 @@
     private MessagingNetworkManager messagingNetManager;
 
     private final ConfigManager configManager;
-
-    private NodeRegistration nodeRegistration;
 
     private final AtomicLong maxJobId = new AtomicLong(-1);
 
@@ -254,7 +247,7 @@
             }
             getNodeControllerInfosAcceptor.setValue(fv);
         }
-        primaryCcs.getNodeControllerInfos();
+        getPrimaryClusterController().getNodeControllerInfos();
         return fv.get();
     }
 
@@ -304,7 +297,8 @@
 
         final InetSocketAddress ccAddress =
                 new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort());
-        this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress);
+        addCc(ncConfig.getClusterControllerId(), ccAddress);
+        this.primaryCcId = ncConfig.getClusterControllerId();
 
         workQueue.start();
 
@@ -315,10 +309,9 @@
         application.startupCompleted();
     }
 
-    public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress ccAddress) throws
Exception {
-        ClusterControllerRemoteProxy ccProxy;
-        synchronized (ccsMap) {
-            if (ccsMap.containsKey(ccId)) {
+    public CcId addCc(CcId ccId, InetSocketAddress ccAddress) throws Exception {
+        synchronized (ccMap) {
+            if (ccMap.containsKey(ccId)) {
                 throw new IllegalStateException("cc already registered: " + ccId);
             }
             final IIPCEventListener ipcEventListener = new IIPCEventListener() {
@@ -326,53 +319,56 @@
                 public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
                     // we need to re-register in case of NC -> CC connection reset
                     try {
-                        registerNode(ccsMap.get(ccId));
+                        registerNode(ccMap.get(ccId));
                     } catch (Exception e) {
                         LOGGER.log(Level.WARN, "Failed Registering with cc", e);
                         throw new IPCException(e);
                     }
                 }
             };
-            ccProxy = new ClusterControllerRemoteProxy(ccId,
+            ClusterControllerRemoteProxy ccProxy = new ClusterControllerRemoteProxy(ccId,
                     ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
-            registerNode(ccProxy);
-            ccsMap.put(ccId, ccProxy);
+            CcConnection ccc = new CcConnection(ccProxy);
+            registerNode(ccc);
+            ccMap.put(ccId, ccc);
         }
-        return ccProxy;
+        return ccId;
     }
 
     public void makePrimaryCc(CcId ccId) throws Exception {
-        synchronized (ccsMap) {
-            if (!ccsMap.containsKey(ccId)) {
+        synchronized (ccMap) {
+            if (!ccMap.containsKey(ccId)) {
                 throw new IllegalArgumentException("unknown cc: " + ccId);
             }
-            primaryCcs = ccsMap.get(ccId);
+            primaryCcId = ccId;
         }
     }
 
     public void removeCc(CcId ccId) throws Exception {
-        synchronized (ccsMap) {
-            final IClusterController ccs = ccsMap.get(ccId);
-            if (ccs == null) {
+        synchronized (ccMap) {
+            final CcConnection ccc = ccMap.get(ccId);
+            if (ccc == null) {
                 throw new IllegalArgumentException("unknown cc: " + ccId);
             }
-            if (primaryCcs.equals(ccs)) {
+            if (primaryCcId.equals(ccc)) {
                 throw new IllegalStateException("cannot remove primary cc: " + ccId);
             }
             // TODO(mblow): consider how to handle running jobs
-            ccs.unregisterNode(id);
-            Thread hbThread = heartbeatThreads.remove(ccs);
+            ccc.ccs.unregisterNode(id);
+            ccMap.remove(ccId);
+            Thread hbThread = heartbeatThreads.remove(ccId);
             hbThread.interrupt();
-            Timer ccTimer = ccTimers.remove(ccs);
+            Timer ccTimer = ccTimers.remove(ccId);
             if (ccTimer != null) {
                 ccTimer.cancel();
             }
         }
     }
 
-    protected void registerNode(IClusterController ccs) throws Exception {
-        LOGGER.info("Registering with Cluster Controller {}", ccs);
-        registrationPending = true;
+    protected void registerNode(CcConnection ccc) throws Exception {
+        LOGGER.info("Registering with Cluster Controller {}", ccc);
+        CcId ccId = ccc.getCcId();
+        ccc.regPending = true;
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
             gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
@@ -390,53 +386,49 @@
         NetworkAddress messagingAddress =
                 messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
: null;
         int allCores = osMXBean.getAvailableProcessors();
-        nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress,
osMXBean.getName(),
-                osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(),
+        NodeRegistration nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig,
netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores,
runtimeMXBean.getVmName(),
                 runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(),
                 runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
                 runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, application.getCapacity(),
                 PidHelper.getPid(), maxJobId.get());
 
+        IClusterController ccs = ccc.ccs;
         ccs.registerNode(nodeRegistration);
 
-        completeNodeRegistration(ccs);
+        NodeParameters nodeParameters = ccc.waitForRegistrationComplete(getDistributedState());
+        application.onRegisterNode(ccId);
 
         // Start heartbeat generator.
-        if (!heartbeatThreads.containsKey(ccs)) {
+        if (!heartbeatThreads.containsKey(ccId)) {
             Thread heartbeatThread =
                     new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()),
id + "-Heartbeat");
             heartbeatThread.setPriority(Thread.MAX_PRIORITY);
             heartbeatThread.setDaemon(true);
             heartbeatThread.start();
-            heartbeatThreads.put(ccs, heartbeatThread);
+            heartbeatThreads.put(ccId, heartbeatThread);
         }
-        if (!ccTimers.containsKey(ccs) && nodeParameters.getProfileDumpPeriod() >
0) {
-            Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true);
+        if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod()
> 0) {
+            Timer ccTimer = new Timer("Timer-" + ccId, true);
             // Schedule profile dump generator.
             ccTimer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
-            ccTimers.put(ccs, ccTimer);
+            ccTimers.put(ccId, ccTimer);
         }
 
-        LOGGER.info("Registering with Cluster Controller {} complete", ccs);
+        LOGGER.info("Registering with Cluster Controller {} complete", ccc);
     }
 
     synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception)
{
-        this.nodeParameters = parameters;
-        this.registrationException = exception;
-        this.registrationPending = false;
-        notifyAll();
+        CcConnection ccc = ccMap.get(parameters.getClusterControllerInfo().getCcId());
+        ccc.nodeParameters = parameters;
+        ccc.regEx = exception;
+        ccc.regPending = false;
+        ccc.notifyAll();
     }
 
-    private synchronized void completeNodeRegistration(IClusterController ccs) throws Exception
{
-        while (registrationPending) {
-            wait();
-        }
-        if (registrationException != null) {
-            LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception",
registrationException);
-            throw registrationException;
-        }
-        serviceCtx.setDistributedState(nodeParameters.getDistributedState());
-        application.onRegisterNode(ccs.getCcId());
+    private ConcurrentHashMap<CcId, Serializable> getDistributedState() {
+        //noinspection unchecked
+        return (ConcurrentHashMap<CcId, Serializable>) serviceCtx.getDistributedState();
     }
 
     private void startApplication() throws Exception {
@@ -478,10 +470,10 @@
                 t.interrupt();
                 InvokeUtil.doUninterruptibly(() -> t.join(1000));
             });
-            synchronized (ccsMap) {
-                ccsMap.values().parallelStream().forEach(ccs -> {
+            synchronized (ccMap) {
+                ccMap.values().parallelStream().forEach(cc -> {
                     try {
-                        ccs.notifyShutdown(id);
+                        cc.ccs.notifyShutdown(id);
                     } catch (Exception e) {
                         LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
                     }
@@ -520,13 +512,8 @@
         jobParameterByteStoreMap.remove(jobId);
     }
 
-    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException
{
-        JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
-        if (jpbs == null) {
-            jpbs = new JobParameterByteStore();
-            jobParameterByteStoreMap.put(jobId, jpbs);
-        }
-        return jpbs;
+    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) {
+        return  jobParameterByteStoreMap.computeIfAbsent(jobId, jid -> new JobParameterByteStore());
     }
 
     public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph
acg)
@@ -550,7 +537,7 @@
         }
     }
 
-    public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId)
throws HyracksException {
+    public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId)
{
         return deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId());
     }
 
@@ -566,16 +553,21 @@
         return partitionManager;
     }
 
+    public CcId getPrimaryCcId() {
+        // TODO(mblow): this can change at any time, need notification framework
+        return primaryCcId;
+    }
+
     public IClusterController getPrimaryClusterController() {
-        return primaryCcs;
+        return getClusterController(primaryCcId);
     }
 
     public IClusterController getClusterController(CcId ccId) {
-        return ccsMap.get(ccId);
+        return ccMap.get(ccId).ccs;
     }
 
-    public NodeParameters getNodeParameters() {
-        return nodeParameters;
+    public NodeParameters getNodeParameters(CcId ccId) {
+        return ccMap.get(ccId).nodeParameters;
     }
 
     @Override
@@ -734,7 +726,7 @@
     }
 
     public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId)
throws Exception {
-        ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id);
+        getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id);
     }
 
     public IDatasetPartitionManager getDatasetPartitionManager() {
@@ -759,4 +751,37 @@
     public Object getApplicationContext() {
         return application.getApplicationContext();
     }
+
+    private static class CcConnection {
+        private final IClusterController ccs;
+        private boolean regPending;
+        private Exception regEx;
+        private NodeParameters nodeParameters;
+
+        private CcConnection(IClusterController ccs) {
+            this.ccs = ccs;
+        }
+
+        private synchronized NodeParameters waitForRegistrationComplete(ConcurrentHashMap<CcId,
Serializable> distributedStateMap) throws Exception {
+            while (regPending) {
+                wait();
+            }
+            if (regEx != null) {
+                LOGGER.log(Level.WARN, "Registering with {} failed with exception", this,
regEx);
+                throw regEx;
+            }
+            distributedStateMap.put(getCcId(), nodeParameters.getDistributedState());
+            return nodeParameters;
+        }
+
+        @Override
+        public String toString() {
+            return getCcId().toString();
+        }
+
+        public CcId getCcId() {
+            return ccs.getCcId();
+        }
+    }
 }
+
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
index 6a75471..8733022 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IStateDumpHandler;
@@ -50,7 +51,7 @@
 
     public NCServiceContext(NodeControllerService ncs, ServerContext serverCtx, IOManager
ioManager, String nodeId,
             MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager,
-            IApplicationConfig appConfig) throws IOException {
+            IApplicationConfig appConfig) {
         super(serverCtx, appConfig, new HyracksThreadFactory(nodeId));
         this.lccm = lifeCyclecomponentManager;
         this.nodeId = nodeId;
@@ -59,6 +60,7 @@
         this.ncs = ncs;
         this.sdh = lccm::dumpState;
         this.tracer = new Tracer(nodeId, ncs.getConfiguration().getTraceCategories(), new
TraceCategoryRegistry());
+        this.distributedState = new ConcurrentHashMap<>();
     }
 
     @Override

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2352
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibcbdfb25d99a11b01ed390ce266f332fb40f17a4
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <mblow@apache.org>

Mime
View raw message