asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murtadha Hubail (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [ASTERIXDB-2019][CLUS] Update cluster state on partitions ch...
Date Fri, 04 Aug 2017 16:56:13 GMT
Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1918

Change subject: [ASTERIXDB-2019][CLUS] Update cluster state on partitions changes
......................................................................

[ASTERIXDB-2019][CLUS] Update cluster state on partitions changes

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Set the cluster to UNUSABLE when no partitions are registered
- Update cluster state after partitions register/de-register
- Reject unregistered nodes queries on CC
- Avoid NPE when trying to send to a node that was de-registered

Change-Id: I7d11733a1dcd86136e157d80517bff4abcfc776b
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
4 files changed, 57 insertions(+), 10 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/18/1918/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index fc0c1ff..9faa9e9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -92,6 +92,11 @@
         ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService();
         CCApplication ccApp = (CCApplication) ccSrv.getApplication();
         CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker();
+        final String rejectionReason = getRejectionReason(ccSrv);
+        if (rejectionReason != null) {
+            sendRejection(rejectionReason, messageBroker);
+            return;
+        }
         CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager();
         ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang);
         IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
@@ -100,16 +105,9 @@
 
         ccSrv.getExecutor().submit(() -> {
             ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
-
             try {
-                final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
-                if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
-                    throw new IllegalStateException("Cannot execute request, cluster is "
+ clusterState);
-                }
-
                 IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
                 List<Statement> statements = parser.parse();
-
                 StringWriter outWriter = new StringWriter(256);
                 PrintWriter outPrinter = new PrintWriter(outWriter);
                 SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
@@ -148,6 +146,27 @@
         });
     }
 
+    private String getRejectionReason(ClusterControllerService ccSrv) {
+        if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) {
+            return "Node is not registerted with the CC";
+        }
+        final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+        if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+            return "Cannot execute request, cluster is " + clusterState;
+        }
+        return null;
+    }
+
+    private void sendRejection(String reason, CCMessageBroker messageBroker) {
+        ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+        responseMsg.setError(new Exception(reason));
+        try {
+            messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, e.toString(), e);
+        }
+    }
+
     @Override
     public String toString() {
         return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId,
requestNodeId,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index de2ca11..0eade41 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -69,7 +69,13 @@
     public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws
Exception {
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
-        state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg),
null, nodeId);
+        if (state != null) {
+            state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg),
null, nodeId);
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Couldn't send message to unregistered node (" + nodeId +
")");
+            }
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a5686fd..30675cd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -100,11 +100,20 @@
 
     /**
      * Register the specified node partitions with the specified nodeId with this cluster
state manager
+     * then calls {@link IClusterStateManager#refreshState()}
+     *
+     * @param nodeId
+     * @param nodePartitions
+     * @throws AsterixException
      */
     void registerNodePartitions(String nodeId, ClusterPartition[] nodePartitions) throws
AsterixException;
 
     /**
      * De-register the specified node's partitions from this cluster state manager
+     * then calls {@link IClusterStateManager#refreshState()}
+     *
+     * @param nodeId
+     * @throws HyracksDataException
      */
-    void deregisterNodePartitions(String nodeId);
+    void deregisterNodePartitions(String nodeId) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 4717a7b..8156a23 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -154,6 +154,12 @@
     @Override
     public synchronized void refreshState() throws HyracksDataException {
         resetClusterPartitionConstraint();
+        if (clusterPartitions.isEmpty()) {
+            LOGGER.info("Cluster does not have any registered partitions");
+            setState(ClusterState.UNUSABLE);
+            return;
+        }
+
         for (ClusterPartition p : clusterPartitions.values()) {
             if (!p.isActive()) {
                 setState(ClusterState.UNUSABLE);
@@ -368,10 +374,16 @@
             clusterPartitions.put(nodePartition.getPartitionId(), nodePartition);
         }
         node2PartitionsMap.put(nodeId, nodePartitions);
+        //TODO fix exception propagation from refreshState
+        try {
+            refreshState();
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
     }
 
     @Override
-    public synchronized void deregisterNodePartitions(String nodeId) {
+    public synchronized void deregisterNodePartitions(String nodeId) throws HyracksDataException
{
         ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId);
         if (nodePartitions == null) {
             LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " (already removed?)");
@@ -382,6 +394,7 @@
             for (ClusterPartition nodePartition : nodePartitions) {
                 clusterPartitions.remove(nodePartition.getPartitionId());
             }
+            refreshState();
         }
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1918
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7d11733a1dcd86136e157d80517bff4abcfc776b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhubail@apache.org>

Mime
View raw message