asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject asterixdb git commit: [ASTERIXDB-2019][CLUS] Update cluster state on partitions changes
Date Fri, 04 Aug 2017 22:38:20 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master f94f63d3f -> e0d8e5078


[ASTERIXDB-2019][CLUS] Update cluster state on partitions changes

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Set the cluster to UNUSABLE when no partitions are registered
- Update cluster state after partitions register/de-register
- Reject unregistered nodes queries on CC
- Avoid NPE when trying to send to a node that was de-registered

Change-Id: I7d11733a1dcd86136e157d80517bff4abcfc776b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1918
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e0d8e507
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e0d8e507
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e0d8e507

Branch: refs/heads/master
Commit: e0d8e5078f90823e8dd51052317a7da1c08cc9f9
Parents: f94f63d
Author: Murtadha Hubail <mhubail@apache.org>
Authored: Fri Aug 4 19:55:49 2017 +0300
Committer: Murtadha Hubail <mhubail@apache.org>
Committed: Fri Aug 4 15:31:54 2017 -0700

----------------------------------------------------------------------
 .../message/ExecuteStatementRequestMessage.java | 33 +++++++++++++++-----
 .../asterix/messaging/CCMessageBroker.java      |  8 ++++-
 .../common/cluster/IClusterStateManager.java    | 11 ++++++-
 .../runtime/utils/ClusterStateManager.java      | 15 ++++++++-
 4 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0d8e507/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index fc0c1ff..9faa9e9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -92,6 +92,11 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
         ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService();
         CCApplication ccApp = (CCApplication) ccSrv.getApplication();
         CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker();
+        final String rejectionReason = getRejectionReason(ccSrv);
+        if (rejectionReason != null) {
+            sendRejection(rejectionReason, messageBroker);
+            return;
+        }
         CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager();
         ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang);
         IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
@@ -100,16 +105,9 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
 
         ccSrv.getExecutor().submit(() -> {
             ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
-
             try {
-                final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
-                if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
-                    throw new IllegalStateException("Cannot execute request, cluster is "
+ clusterState);
-                }
-
                 IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
                 List<Statement> statements = parser.parse();
-
                 StringWriter outWriter = new StringWriter(256);
                 PrintWriter outPrinter = new PrintWriter(outWriter);
                 SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
@@ -148,6 +146,27 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
         });
     }
 
+    private String getRejectionReason(ClusterControllerService ccSrv) {
+        if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) {
+            return "Node is not registerted with the CC";
+        }
+        final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+        if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+            return "Cannot execute request, cluster is " + clusterState;
+        }
+        return null;
+    }
+
+    private void sendRejection(String reason, CCMessageBroker messageBroker) {
+        ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+        responseMsg.setError(new Exception(reason));
+        try {
+            messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, e.toString(), e);
+        }
+    }
+
     @Override
     public String toString() {
         return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId,
requestNodeId,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0d8e507/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index de2ca11..0eade41 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -69,7 +69,13 @@ public class CCMessageBroker implements ICCMessageBroker {
     public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws
Exception {
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
-        state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg),
null, nodeId);
+        if (state != null) {
+            state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg),
null, nodeId);
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Couldn't send message to unregistered node (" + nodeId +
")");
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0d8e507/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a5686fd..30675cd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -100,11 +100,20 @@ public interface IClusterStateManager {
 
     /**
      * Register the specified node partitions with the specified nodeId with this cluster
state manager
+     * then calls {@link IClusterStateManager#refreshState()}
+     *
+     * @param nodeId
+     * @param nodePartitions
+     * @throws AsterixException
      */
     void registerNodePartitions(String nodeId, ClusterPartition[] nodePartitions) throws
AsterixException;
 
     /**
      * De-register the specified node's partitions from this cluster state manager
+     * then calls {@link IClusterStateManager#refreshState()}
+     *
+     * @param nodeId
+     * @throws HyracksDataException
      */
-    void deregisterNodePartitions(String nodeId);
+    void deregisterNodePartitions(String nodeId) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0d8e507/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 4717a7b..8156a23 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -154,6 +154,12 @@ public class ClusterStateManager implements IClusterStateManager {
     @Override
     public synchronized void refreshState() throws HyracksDataException {
         resetClusterPartitionConstraint();
+        if (clusterPartitions.isEmpty()) {
+            LOGGER.info("Cluster does not have any registered partitions");
+            setState(ClusterState.UNUSABLE);
+            return;
+        }
+
         for (ClusterPartition p : clusterPartitions.values()) {
             if (!p.isActive()) {
                 setState(ClusterState.UNUSABLE);
@@ -368,10 +374,16 @@ public class ClusterStateManager implements IClusterStateManager {
             clusterPartitions.put(nodePartition.getPartitionId(), nodePartition);
         }
         node2PartitionsMap.put(nodeId, nodePartitions);
+        //TODO fix exception propagation from refreshState
+        try {
+            refreshState();
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
     }
 
     @Override
-    public synchronized void deregisterNodePartitions(String nodeId) {
+    public synchronized void deregisterNodePartitions(String nodeId) throws HyracksDataException
{
         ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId);
         if (nodePartitions == null) {
             LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " (already removed?)");
@@ -382,6 +394,7 @@ public class ClusterStateManager implements IClusterStateManager {
             for (ClusterPartition nodePartition : nodePartitions) {
                 clusterPartitions.remove(nodePartition.getPartitionId());
             }
+            refreshState();
         }
     }
 


Mime
View raw message