Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 024CC200B86 for ; Sat, 3 Sep 2016 14:44:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 00EA7160ACF; Sat, 3 Sep 2016 12:44:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DA996160ACD for ; Sat, 3 Sep 2016 14:44:47 +0200 (CEST) Received: (qmail 78359 invoked by uid 500); 3 Sep 2016 12:44:47 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 78289 invoked by uid 99); 3 Sep 2016 12:44:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 03 Sep 2016 12:44:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CF7ACE0BD9; Sat, 3 Sep 2016 12:44:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.apache.org Date: Sat, 03 Sep 2016 12:44:48 -0000 Message-Id: In-Reply-To: <3be4e753beed4e24a76c863a93fe7ff4@git.apache.org> References: <3be4e753beed4e24a76c863a93fe7ff4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] asterixdb git commit: Refactor Messaging archived-at: Sat, 03 Sep 2016 12:44:50 -0000 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java deleted file mode 100644 index 5acf9ae..0000000 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java +++ /dev/null @@ -1,690 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.om.util; - -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; - -import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; -import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.AsterixReplicationProperties; -import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage; -import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage; -import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage; -import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage; -import org.apache.asterix.common.messaging.ReplicaEventMessage; -import org.apache.asterix.common.messaging.TakeoverMetadataNodeRequestMessage; -import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage; -import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage; -import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage; -import org.apache.asterix.common.messaging.api.ICCMessageBroker; -import org.apache.asterix.common.replication.NodeFailbackPlan; -import org.apache.asterix.common.replication.NodeFailbackPlan.FailbackPlanState; -import org.apache.asterix.event.schema.cluster.Cluster; -import org.apache.asterix.event.schema.cluster.Node; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; -import org.json.JSONException; -import org.json.JSONObject; - -/** - * A holder class for properties related to the Asterix cluster. - */ - -public class AsterixClusterProperties { - /** - * TODO: currently after instance restarts we require all nodes to join again, otherwise the cluster wont be ACTIVE. - * we may overcome this by storing the cluster state before the instance shutdown and using it on startup to identify - * the nodes that are expected the join. - */ - - private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName()); - public static final AsterixClusterProperties INSTANCE = new AsterixClusterProperties(); - public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml"; - - private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address"; - private static final String IO_DEVICES = "iodevices"; - private static final String DEFAULT_STORAGE_DIR_NAME = "storage"; - private Map> activeNcConfiguration = new HashMap>(); - - private final Cluster cluster; - private ClusterState state = ClusterState.UNUSABLE; - - private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint; - - private boolean globalRecoveryCompleted = false; - - private Map node2PartitionsMap = null; - private SortedMap clusterPartitions = null; - private Map pendingTakeoverRequests = null; - - private long clusterRequestId = 0; - private String currentMetadataNode = null; - private boolean metadataNodeActive = false; - private boolean autoFailover = false; - private boolean replicationEnabled = false; - private Set failedNodes = new HashSet<>(); - private LinkedList pendingProcessingFailbackPlans; - private Map planId2FailbackPlanMap; - - private AsterixClusterProperties() { - InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE); - if (is != null) { - try { - JAXBContext ctx = JAXBContext.newInstance(Cluster.class); - Unmarshaller unmarshaller = ctx.createUnmarshaller(); - cluster = (Cluster) unmarshaller.unmarshal(is); - } catch (JAXBException e) { - throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE); - } - } else { - cluster = null; - } - // if this is the CC process - if (AsterixAppContextInfo.getInstance() != null) { - if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) { - node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions(); - clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions(); - currentMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName(); - replicationEnabled = isReplicationEnabled(); - autoFailover = isAutoFailoverEnabled(); - if (autoFailover) { - pendingTakeoverRequests = new HashMap<>(); - pendingProcessingFailbackPlans = new LinkedList<>(); - planId2FailbackPlanMap = new HashMap<>(); - } - } - } - } - - public synchronized void removeNCConfiguration(String nodeId) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Removing configuration parameters for node id " + nodeId); - } - activeNcConfiguration.remove(nodeId); - - //if this node was waiting for failback and failed before it completed - if (failedNodes.contains(nodeId)) { - if (autoFailover) { - notifyFailbackPlansNodeFailure(nodeId); - revertFailedFailbackPlanEffects(); - } - } else { - //an active node failed - failedNodes.add(nodeId); - if (nodeId.equals(currentMetadataNode)) { - metadataNodeActive = false; - LOGGER.info("Metadata node is now inactive"); - } - updateNodePartitions(nodeId, false); - if (replicationEnabled) { - notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE); - if (autoFailover) { - notifyFailbackPlansNodeFailure(nodeId); - requestPartitionsTakeover(nodeId); - } - } - } - } - - public synchronized void addNCConfiguration(String nodeId, Map configuration) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Registering configuration parameters for node id " + nodeId); - } - activeNcConfiguration.put(nodeId, configuration); - - //a node trying to come back after failure - if (failedNodes.contains(nodeId)) { - if (autoFailover) { - prepareFailbackPlan(nodeId); - return; - } else { - //a node completed local or remote recovery and rejoined - failedNodes.remove(nodeId); - if (replicationEnabled) { - //notify other replica to reconnect to this node - notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN); - } - } - } - - if (nodeId.equals(currentMetadataNode)) { - metadataNodeActive = true; - LOGGER.info("Metadata node is now active"); - } - updateNodePartitions(nodeId, true); - } - - private synchronized void updateNodePartitions(String nodeId, boolean added) { - ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId); - // if this isn't a storage node, it will not have cluster partitions - if (nodePartitions != null) { - for (ClusterPartition p : nodePartitions) { - // set the active node for this node's partitions - p.setActive(added); - if (added) { - p.setActiveNodeId(nodeId); - } - } - resetClusterPartitionConstraint(); - updateClusterState(); - } - } - - private synchronized void updateClusterState() { - for (ClusterPartition p : clusterPartitions.values()) { - if (!p.isActive()) { - state = ClusterState.UNUSABLE; - LOGGER.info("Cluster is in UNUSABLE state"); - return; - } - } - //if all storage partitions are active as well as the metadata node, then the cluster is active - if (metadataNodeActive) { - state = ClusterState.ACTIVE; - LOGGER.info("Cluster is now ACTIVE"); - //start global recovery - AsterixAppContextInfo.getInstance().getGlobalRecoveryManager().startGlobalRecovery(); - if (autoFailover) { - //if there are any pending failback requests, process them - if (pendingProcessingFailbackPlans.size() > 0) { - processPendingFailbackPlans(); - } - } - } else { - requestMetadataNodeTakeover(); - } - } - - /** - * Returns the number of IO devices configured for a Node Controller - * - * @param nodeId - * unique identifier of the Node Controller - * @return number of IO devices. -1 if the node id is not valid. A node id - * is not valid if it does not correspond to the set of registered - * Node Controllers. - */ - public int getNumberOfIODevices(String nodeId) { - String[] ioDevs = getIODevices(nodeId); - return ioDevs == null ? -1 : ioDevs.length; - } - - /** - * Returns the IO devices configured for a Node Controller - * - * @param nodeId - * unique identifier of the Node Controller - * @return a list of IO devices. null if node id is not valid. A node id is not valid - * if it does not correspond to the set of registered Node Controllers. - */ - public synchronized String[] getIODevices(String nodeId) { - Map ncConfig = activeNcConfiguration.get(nodeId); - if (ncConfig == null) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Configuration parameters for nodeId " + nodeId - + " not found. The node has not joined yet or has left."); - } - return null; - } - return ncConfig.get(IO_DEVICES).split(","); - } - - public ClusterState getState() { - return state; - } - - public Cluster getCluster() { - return cluster; - } - - public synchronized Node getAvailableSubstitutionNode() { - List subNodes = cluster.getSubstituteNodes() == null ? null : cluster.getSubstituteNodes().getNode(); - return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0); - } - - public synchronized Set getParticipantNodes() { - Set participantNodes = new HashSet(); - for (String pNode : activeNcConfiguration.keySet()) { - participantNodes.add(pNode); - } - return participantNodes; - } - - public synchronized AlgebricksAbsolutePartitionConstraint getClusterLocations() { - if (clusterPartitionConstraint == null) { - resetClusterPartitionConstraint(); - } - return clusterPartitionConstraint; - } - - private synchronized void resetClusterPartitionConstraint() { - ArrayList clusterActiveLocations = new ArrayList<>(); - for (ClusterPartition p : clusterPartitions.values()) { - if (p.isActive()) { - clusterActiveLocations.add(p.getActiveNodeId()); - } - } - clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint( - clusterActiveLocations.toArray(new String[] {})); - } - - public boolean isGlobalRecoveryCompleted() { - return globalRecoveryCompleted; - } - - public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) { - this.globalRecoveryCompleted = globalRecoveryCompleted; - } - - public boolean isClusterActive() { - if (cluster == null) { - // this is a virtual cluster - return true; - } - return state == ClusterState.ACTIVE; - } - - public static int getNumberOfNodes() { - return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size(); - } - - public synchronized ClusterPartition[] getNodePartitions(String nodeId) { - return node2PartitionsMap.get(nodeId); - } - - public synchronized int getNodePartitionsCount(String node) { - if (node2PartitionsMap.containsKey(node)) { - return node2PartitionsMap.get(node).length; - } - return 0; - } - - public synchronized ClusterPartition[] getClusterPartitons() { - ArrayList partitons = new ArrayList<>(); - for (ClusterPartition partition : clusterPartitions.values()) { - partitons.add(partition); - } - return partitons.toArray(new ClusterPartition[] {}); - } - - public String getStorageDirectoryName() { - if (cluster != null) { - return cluster.getStore(); - } - // virtual cluster without cluster config file - return DEFAULT_STORAGE_DIR_NAME; - } - - private synchronized void requestPartitionsTakeover(String failedNodeId) { - //replica -> list of partitions to takeover - Map> partitionRecoveryPlan = new HashMap<>(); - AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance() - .getReplicationProperties(); - - //collect the partitions of the failed NC - List lostPartitions = getNodeAssignedPartitions(failedNodeId); - if (lostPartitions.size() > 0) { - for (ClusterPartition partition : lostPartitions) { - //find replicas for this partitions - Set partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId()); - //find a replica that is still active - for (String replica : partitionReplicas) { - //TODO (mhubail) currently this assigns the partition to the first found active replica. - //It needs to be modified to consider load balancing. - if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) { - if (!partitionRecoveryPlan.containsKey(replica)) { - List replicaPartitions = new ArrayList<>(); - replicaPartitions.add(partition.getPartitionId()); - partitionRecoveryPlan.put(replica, replicaPartitions); - } else { - partitionRecoveryPlan.get(replica).add(partition.getPartitionId()); - } - } - break; - } - } - - if (partitionRecoveryPlan.size() == 0) { - //no active replicas were found for the failed node - LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions); - return; - } else { - LOGGER.info("Partitions to recover: " + lostPartitions); - } - ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance() - .getCCApplicationContext().getMessageBroker(); - //For each replica, send a request to takeover the assigned partitions - for (String replica : partitionRecoveryPlan.keySet()) { - Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {}); - long requestId = clusterRequestId++; - TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, - replica, partitionsToTakeover); - pendingTakeoverRequests.put(requestId, takeoverRequest); - try { - messageBroker.sendApplicationMessageToNC(takeoverRequest, replica); - } catch (Exception e) { - /** - * if we fail to send the request, it means the NC we tried to send the request to - * has failed. When the failure notification arrives, we will send any pending request - * that belongs to the failed NC to a different active replica. - */ - LOGGER.warning("Failed to send takeover request: " + takeoverRequest); - e.printStackTrace(); - } - } - } - } - - private synchronized List getNodeAssignedPartitions(String nodeId) { - List nodePartitions = new ArrayList<>(); - for (ClusterPartition partition : clusterPartitions.values()) { - if (partition.getActiveNodeId().equals(nodeId)) { - nodePartitions.add(partition); - } - } - /** - * if there is any pending takeover request that this node was supposed to handle, - * it needs to be sent to a different replica - */ - List failedTakeoverRequests = new ArrayList<>(); - for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) { - if (request.getNodeId().equals(nodeId)) { - for (Integer partitionId : request.getPartitions()) { - nodePartitions.add(clusterPartitions.get(partitionId)); - } - failedTakeoverRequests.add(request.getId()); - } - } - - //remove failed requests - for (Long requestId : failedTakeoverRequests) { - pendingTakeoverRequests.remove(requestId); - } - return nodePartitions; - } - - private synchronized void requestMetadataNodeTakeover() { - //need a new node to takeover metadata node - ClusterPartition metadataPartiton = AsterixAppContextInfo.getInstance().getMetadataProperties() - .getMetadataPartition(); - //request the metadataPartition node to register itself as the metadata node - TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage(); - ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance() - .getCCApplicationContext().getMessageBroker(); - try { - messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId()); - } catch (Exception e) { - /** - * if we fail to send the request, it means the NC we tried to send the request to - * has failed. When the failure notification arrives, a new NC will be assigned to - * the metadata partition and a new metadata node takeover request will be sent to it. - */ - LOGGER.warning("Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId()); - e.printStackTrace(); - } - } - - public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) { - for (Integer partitonId : reponse.getPartitions()) { - ClusterPartition partition = clusterPartitions.get(partitonId); - partition.setActive(true); - partition.setActiveNodeId(reponse.getNodeId()); - } - pendingTakeoverRequests.remove(reponse.getRequestId()); - resetClusterPartitionConstraint(); - updateClusterState(); - } - - public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage reponse) { - currentMetadataNode = reponse.getNodeId(); - metadataNodeActive = true; - LOGGER.info("Current metadata node: " + currentMetadataNode); - updateClusterState(); - } - - private synchronized void prepareFailbackPlan(String failingBackNodeId) { - NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId); - pendingProcessingFailbackPlans.add(plan); - planId2FailbackPlanMap.put(plan.getPlanId(), plan); - - //get all partitions this node requires to resync - AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance() - .getReplicationProperties(); - Set nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId); - for (String replicaId : nodeReplicas) { - ClusterPartition[] nodePartitions = node2PartitionsMap.get(replicaId); - for (ClusterPartition partition : nodePartitions) { - plan.addParticipant(partition.getActiveNodeId()); - /** - * if the partition original node is the returning node, - * add it to the list of the partitions which will be failed back - */ - if (partition.getNodeId().equals(failingBackNodeId)) { - plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId()); - } - } - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Prepared Failback plan: " + plan.toString()); - } - - processPendingFailbackPlans(); - } - - private synchronized void processPendingFailbackPlans() { - /** - * if the cluster state is not ACTIVE, then failbacks should not be processed - * since some partitions are not active - */ - if (state == ClusterState.ACTIVE) { - while (!pendingProcessingFailbackPlans.isEmpty()) { - //take the first pending failback plan - NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop(); - /** - * A plan at this stage will be in one of two states: - * 1. PREPARING -> the participants were selected but we haven't sent any request. - * 2. PENDING_ROLLBACK -> a participant failed before we send any requests - */ - if (plan.getState() == FailbackPlanState.PREPARING) { - //set the partitions that will be failed back as inactive - String failbackNode = plan.getNodeId(); - for (Integer partitionId : plan.getPartitionsToFailback()) { - ClusterPartition clusterPartition = clusterPartitions.get(partitionId); - clusterPartition.setActive(false); - //partition expected to be returned to the failing back node - clusterPartition.setActiveNodeId(failbackNode); - } - - /** - * if the returning node is the original metadata node, - * then metadata node will change after the failback completes - */ - String originalMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties() - .getMetadataNodeName(); - if (originalMetadataNode.equals(failbackNode)) { - plan.setNodeToReleaseMetadataManager(currentMetadataNode); - currentMetadataNode = ""; - metadataNodeActive = false; - } - - //force new jobs to wait - state = ClusterState.REBALANCING; - - ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance() - .getCCApplicationContext().getMessageBroker(); - //send requests to other nodes to complete on-going jobs and prepare partitions for failback - Set planFailbackRequests = plan.getPlanFailbackRequests(); - for (PreparePartitionsFailbackRequestMessage request : planFailbackRequests) { - try { - messageBroker.sendApplicationMessageToNC(request, request.getNodeID()); - plan.addPendingRequest(request); - } catch (Exception e) { - LOGGER.warning("Failed to send failback request to: " + request.getNodeID()); - e.printStackTrace(); - plan.notifyNodeFailure(request.getNodeID()); - revertFailedFailbackPlanEffects(); - break; - } - } - /** - * wait until the current plan is completed before processing the next plan. - * when the current one completes or is reverted, the cluster state will be - * ACTIVE again, and the next failback plan (if any) will be processed. - */ - break; - } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - //this plan failed before sending any requests -> nothing to rollback - planId2FailbackPlanMap.remove(plan.getPlanId()); - } - } - } - } - - public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) { - NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId()); - plan.markRequestCompleted(msg.getRequestId()); - /** - * A plan at this stage will be in one of three states: - * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait). - * 2. PENDING_COMPLETION -> all responses received (time to send completion request). - * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert). - */ - if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) { - CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage(); - - //send complete resync and takeover partitions to the failing back node - ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance() - .getCCApplicationContext().getMessageBroker(); - try { - messageBroker.sendApplicationMessageToNC(request, request.getNodeId()); - } catch (Exception e) { - LOGGER.warning("Failed to send complete failback request to: " + request.getNodeId()); - e.printStackTrace(); - notifyFailbackPlansNodeFailure(request.getNodeId()); - revertFailedFailbackPlanEffects(); - } - } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - revertFailedFailbackPlanEffects(); - } - } - - public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) { - /** - * the failback plan completed successfully: - * Remove all references to it. - * Remove the the failing back node from the failed nodes list. - * Notify its replicas to reconnect to it. - * Set the failing back node partitions as active. - */ - NodeFailbackPlan plan = planId2FailbackPlanMap.remove(reponse.getPlanId()); - String nodeId = plan.getNodeId(); - failedNodes.remove(nodeId); - //notify impacted replicas they can reconnect to this node - notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN); - updateNodePartitions(nodeId, true); - } - - private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) { - AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance() - .getReplicationProperties(); - Set remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId); - String nodeIdAddress = ""; - //in case the node joined with a new IP address, we need to send it to the other replicas - if (event == ClusterEventType.NODE_JOIN) { - nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY); - } - - ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event); - ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance() - .getCCApplicationContext().getMessageBroker(); - for (String replica : remoteReplicas) { - //if the remote replica is alive, send the event - if (activeNcConfiguration.containsKey(replica)) { - try { - messageBroker.sendApplicationMessageToNC(msg, replica); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - } - - private synchronized void revertFailedFailbackPlanEffects() { - Iterator iterator = planId2FailbackPlanMap.values().iterator(); - while (iterator.hasNext()) { - NodeFailbackPlan plan = iterator.next(); - if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) { - //TODO if the failing back node is still active, notify it to construct a new plan for it - iterator.remove(); - - //reassign the partitions that were supposed to be failed back to an active replica - requestPartitionsTakeover(plan.getNodeId()); - } - } - } - - private synchronized void notifyFailbackPlansNodeFailure(String nodeId) { - Iterator iterator = planId2FailbackPlanMap.values().iterator(); - while (iterator.hasNext()) { - NodeFailbackPlan plan = iterator.next(); - plan.notifyNodeFailure(nodeId); - } - } - - public synchronized boolean isMetadataNodeActive() { - return metadataNodeActive; - } - - public boolean isReplicationEnabled() { - if (cluster != null && cluster.getDataReplication() != null) { - return cluster.getDataReplication().isEnabled(); - } - return false; - } - - public boolean isAutoFailoverEnabled() { - return isReplicationEnabled() && cluster.getDataReplication().isAutoFailover(); - } - - public synchronized JSONObject getClusterStateDescription() throws JSONException { - JSONObject stateDescription = new JSONObject(); - stateDescription.put("State", state.name()); - stateDescription.put("Metadata_Node", currentMetadataNode); - for (ClusterPartition partition : clusterPartitions.values()) { - stateDescription.put("partition_" + partition.getPartitionId(), partition.getActiveNodeId()); - } - return stateDescription; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java deleted file mode 100644 index 0e9aa0c..0000000 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.om.util; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hyracks.control.cc.ClusterControllerService; - -/** - * Utility class for obtaining information on the set of Hyracks NodeController - * processes that are running on a given host. - */ -public class AsterixRuntimeUtil { - - public static Set getNodeControllersOnIP(InetAddress ipAddress) throws Exception { - Map> nodeControllerInfo = getNodeControllerMap(); - Set nodeControllersAtLocation = nodeControllerInfo.get(ipAddress); - return nodeControllersAtLocation; - } - - public static List getAllNodeControllers() throws Exception { - Collection> nodeControllersCollection = getNodeControllerMap().values(); - List nodeControllers = new ArrayList(); - for (Set ncCollection : nodeControllersCollection) { - nodeControllers.addAll(ncCollection); - } - return nodeControllers; - } - - public static Map> getNodeControllerMap() throws Exception { - Map> map = new HashMap>(); - AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map); - return map; - } - - public static void getNodeControllerMap(Map> map) throws Exception { - ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.getInstance() - .getCCApplicationContext().getControllerService(); - map.putAll(ccs.getIpAddressNodeNameMap()); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index d247490..5a88a9f 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -59,6 +59,7 @@ import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.LogSource; import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.common.utils.TransactionUtil; import org.apache.asterix.replication.functions.ReplicaFilesRequest; import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest; @@ -69,7 +70,6 @@ import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask; import org.apache.asterix.replication.storage.LSMComponentProperties; import org.apache.asterix.replication.storage.LSMIndexFileProperties; import org.apache.asterix.replication.storage.ReplicaResourcesManager; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.application.INCApplicationContext; import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index 5ba6ad2..53bd164 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -65,6 +65,7 @@ import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.event.schema.cluster.Node; import org.apache.asterix.replication.functions.ReplicaFilesRequest; import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest; @@ -75,7 +76,6 @@ import org.apache.asterix.replication.logging.TxnLogReplicator; import org.apache.asterix.replication.storage.LSMComponentProperties; import org.apache.asterix.replication.storage.LSMIndexFileProperties; import org.apache.asterix.replication.storage.ReplicaResourcesManager; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.replication.IReplicationJob; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java index 4da5fd4..46a2076 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java @@ -38,9 +38,9 @@ import org.apache.asterix.common.replication.IRemoteRecoveryManager; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.IRecoveryManager; -import org.apache.asterix.om.util.AsterixClusterProperties; -import org.apache.asterix.replication.storage.ReplicaResourcesManager; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.asterix.replication.storage.ReplicaResourcesManager; +import org.apache.asterix.runtime.util.AsterixClusterProperties; public class RemoteRecoveryManager implements IRemoteRecoveryManager { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java index 41fc0b8..01d6db7 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java @@ -40,10 +40,10 @@ import java.util.logging.Logger; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.AsterixMetadataProperties; import org.apache.asterix.common.replication.IReplicaResourcesManager; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; -import org.apache.asterix.om.util.AsterixClusterProperties; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.asterix.runtime.util.AsterixClusterProperties; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.common.file.ILocalResourceRepository; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java new file mode 100644 index 0000000..2eb1be9 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import org.apache.asterix.common.messaging.AbstractApplicationMessage; + +public abstract class AbstractFailbackPlanMessage extends AbstractApplicationMessage { + + private static final long serialVersionUID = 1L; + protected final long planId; + protected final int requestId; + + public AbstractFailbackPlanMessage(long planId, int requestId) { + this.planId = planId; + this.requestId = requestId; + } + + public long getPlanId() { + return planId; + } + + public int getRequestId() { + return requestId; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java new file mode 100644 index 0000000..c96bcb8 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import java.io.IOException; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.replication.IRemoteRecoveryManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(CompleteFailbackRequestMessage.class.getName()); + private final Set partitions; + private final String nodeId; + + public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set partitions) { + super(planId, requestId); + this.nodeId = nodeId; + this.partitions = partitions; + } + + public Set getPartitions() { + return partitions; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Plan ID: " + planId); + sb.append(" Node ID: " + nodeId); + sb.append(" Partitions: " + partitions); + return sb.toString(); + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAsterixAppRuntimeContext appContext = + (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); + HyracksDataException hde = null; + try { + IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager(); + remoteRecoeryManager.completeFailbackProcess(); + } catch (IOException | InterruptedException e) { + LOGGER.log(Level.SEVERE, "Failure during completion of failback process", e); + hde = ExceptionUtils.convertToHyracksDataException(e); + } finally { + CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId, + requestId, partitions); + try { + broker.sendMessageToCC(reponse, null); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failure sending message to CC", e); + hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e); + } + } + if (hde != null) { + throw hde; + } + } + + @Override + public String type() { + return "COMPLETE_FAILBACK_REQUEST"; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java new file mode 100644 index 0000000..3c4b58c --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import java.util.Set; + +import org.apache.asterix.runtime.util.AsterixClusterProperties; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage { + + private static final long serialVersionUID = 1L; + private final Set partitions; + + public CompleteFailbackResponseMessage(long planId, int requestId, Set partitions) { + super(planId, requestId); + this.partitions = partitions; + } + + public Set getPartitions() { + return partitions; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Plan ID: " + planId); + sb.append(" Partitions: " + partitions); + return sb.toString(); + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException { + AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(this); + } + + @Override + public String type() { + return "COMPLETE_FAILBACK_RESPONSE"; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java new file mode 100644 index 0000000..24f5fe8 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class NodeFailbackPlan { + + public enum FailbackPlanState { + /** + * Initial state while selecting the nodes that will participate + * in the node failback plan. + */ + PREPARING, + /** + * Once a pending {@link PreparePartitionsFailbackRequestMessage} request is added, + * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE to indicate + * a response is expected and need to wait for it. + */ + PENDING_PARTICIPANT_REPONSE, + /** + * Upon receiving the last {@link PreparePartitionsFailbackResponseMessage} response, + * the state changes from PENDING_PARTICIPANT_REPONSE to PENDING_COMPLETION to indicate + * the need to send {@link CompleteFailbackRequestMessage} to the failing back node. + */ + PENDING_COMPLETION, + /** + * if any of the participants fail or the failing back node itself fails during + * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, PENDING_COMPLETION), + * the state is changed to FAILED. + */ + FAILED, + /** + * if the state is FAILED, and all pending responses (if any) have been received, + * the state changes from FAILED to PENDING_ROLLBACK to indicate the need to revert + * the effects of this plan (if any). + */ + PENDING_ROLLBACK + } + + private static long planIdGenerator = 0; + private long planId; + private final String nodeId; + private final Set participants; + private final Map partition2nodeMap; + private String nodeToReleaseMetadataManager; + private int requestId; + private Map pendingRequests; + private FailbackPlanState state; + + public static NodeFailbackPlan createPlan(String nodeId) { + return new NodeFailbackPlan(planIdGenerator++, nodeId); + } + + private NodeFailbackPlan(long planId, String nodeId) { + this.planId = planId; + this.nodeId = nodeId; + participants = new HashSet<>(); + partition2nodeMap = new HashMap<>(); + pendingRequests = new HashMap<>(); + state = FailbackPlanState.PREPARING; + } + + public synchronized void addPartitionToFailback(int partitionId, String currentActiveNode) { + partition2nodeMap.put(partitionId, currentActiveNode); + } + + public synchronized void addParticipant(String nodeId) { + participants.add(nodeId); + } + + public synchronized void notifyNodeFailure(String failedNode) { + if (participants.contains(failedNode)) { + if (state == FailbackPlanState.PREPARING) { + state = FailbackPlanState.FAILED; + } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) { + /** + * if there is any pending request from this failed node, + * it should be marked as completed and the plan should be marked as failed + */ + Set failedRequests = new HashSet<>(); + for (PreparePartitionsFailbackRequestMessage request : pendingRequests.values()) { + if (request.getNodeID().equals(failedNode)) { + failedRequests.add(request.getRequestId()); + } + } + + if (!failedRequests.isEmpty()) { + state = FailbackPlanState.FAILED; + for (Integer failedRequestId : failedRequests) { + markRequestCompleted(failedRequestId); + } + } + } + } else if (nodeId.equals(failedNode)) { + //if the failing back node is the failed node itself + state = FailbackPlanState.FAILED; + updateState(); + } + } + + public synchronized Set getPartitionsToFailback() { + return new HashSet<>(partition2nodeMap.keySet()); + } + + public synchronized void addPendingRequest(PreparePartitionsFailbackRequestMessage msg) { + //if this is the first request + if (pendingRequests.size() == 0) { + state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE; + } + pendingRequests.put(msg.getRequestId(), msg); + } + + public synchronized void markRequestCompleted(int requestId) { + pendingRequests.remove(requestId); + updateState(); + } + + private void updateState() { + if (pendingRequests.size() == 0) { + switch (state) { + case PREPARING: + case FAILED: + state = FailbackPlanState.PENDING_ROLLBACK; + break; + case PENDING_PARTICIPANT_REPONSE: + state = FailbackPlanState.PENDING_COMPLETION; + break; + default: + break; + } + } + } + + public synchronized Set getPlanFailbackRequests() { + Set node2Partitions = new HashSet<>(); + /** + * for each participant, construct a request with the partitions + * that will be failed back or flushed. + */ + for (String participant : participants) { + Set partitionToPrepareForFailback = new HashSet<>(); + for (Map.Entry entry : partition2nodeMap.entrySet()) { + if (entry.getValue().equals(participant)) { + partitionToPrepareForFailback.add(entry.getKey()); + } + } + PreparePartitionsFailbackRequestMessage msg = new PreparePartitionsFailbackRequestMessage(planId, + requestId++, participant, partitionToPrepareForFailback); + if (participant.equals(nodeToReleaseMetadataManager)) { + msg.setReleaseMetadataNode(true); + } + node2Partitions.add(msg); + } + return node2Partitions; + } + + public synchronized CompleteFailbackRequestMessage getCompleteFailbackRequestMessage() { + return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, getPartitionsToFailback()); + } + + public String getNodeId() { + return nodeId; + } + + public long getPlanId() { + return planId; + } + + public void setNodeToReleaseMetadataManager(String nodeToReleaseMetadataManager) { + this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager; + } + + public synchronized FailbackPlanState getState() { + return state; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Plan ID: " + planId); + sb.append(" Failing back node: " + nodeId); + sb.append(" Participants: " + participants); + sb.append(" Partitions to Failback: " + partition2nodeMap.keySet()); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java new file mode 100644 index 0000000..e3b9fbe --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import java.rmi.RemoteException; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(PreparePartitionsFailbackRequestMessage.class.getName()); + private final Set partitions; + private boolean releaseMetadataNode = false; + private final String nodeID; + + public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set partitions) { + super(planId, requestId); + this.nodeID = nodeId; + this.partitions = partitions; + } + + public Set getPartitions() { + return partitions; + } + + public boolean isReleaseMetadataNode() { + return releaseMetadataNode; + } + + public void setReleaseMetadataNode(boolean releaseMetadataNode) { + this.releaseMetadataNode = releaseMetadataNode; + } + + public String getNodeID() { + return nodeID; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Plan ID: " + planId); + sb.append(" Partitions: " + partitions); + sb.append(" releaseMetadataNode: " + releaseMetadataNode); + return sb.toString(); + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAsterixAppRuntimeContext appContext = + (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker(); + /** + * if the metadata partition will be failed back + * we need to flush and close all datasets including metadata datasets + * otherwise we need to close all non-metadata datasets and flush metadata datasets + * so that their memory components will be copied to the failing back node + */ + if (releaseMetadataNode) { + appContext.getDatasetLifecycleManager().closeAllDatasets(); + //remove the metadata node stub from RMI registry + try { + appContext.unexportMetadataNodeStub(); + } catch (RemoteException e) { + LOGGER.log(Level.SEVERE, "Failed unexporting metadata stub", e); + throw ExceptionUtils.convertToHyracksDataException(e); + } + } else { + //close all non-metadata datasets + appContext.getDatasetLifecycleManager().closeUserDatasets(); + //flush the remaining metadata datasets that were not closed + appContext.getDatasetLifecycleManager().flushAllDatasets(); + } + + //mark the partitions to be closed as inactive + PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext + .getLocalResourceRepository(); + for (Integer partitionId : partitions) { + localResourceRepo.addInactivePartition(partitionId); + } + + //send response after partitions prepared for failback + PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(planId, + requestId, partitions); + try { + broker.sendMessageToCC(reponse, null); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Failed sending message to cc", e); + throw ExceptionUtils.convertToHyracksDataException(e); + } + } + + @Override + public String type() { + return "PREPARE_PARTITIONS_FAILBACK_REQUEST"; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java new file mode 100644 index 0000000..2e52773 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import java.util.Set; + +import org.apache.asterix.runtime.util.AsterixClusterProperties; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage { + + private static final long serialVersionUID = 1L; + private final Set partitions; + + public PreparePartitionsFailbackResponseMessage(long planId, int requestId, Set partitions) { + super(planId, requestId); + this.partitions = partitions; + } + + public Set getPartitions() { + return partitions; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException { + AsterixClusterProperties.INSTANCE.processPreparePartitionsFailbackResponse(this); + } + + @Override + public String type() { + return "PREPARE_PARTITIONS_FAILBACK_RESPONSE"; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java new file mode 100644 index 0000000..2aa4746 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.messaging.AbstractApplicationMessage; +import org.apache.asterix.common.replication.Replica; +import org.apache.asterix.common.replication.ReplicaEvent; +import org.apache.asterix.event.schema.cluster.Node; +import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class ReplicaEventMessage extends AbstractApplicationMessage { + + private static final long serialVersionUID = 1L; + private final String nodeId; + private final ClusterEventType event; + private final String nodeIPAddress; + + public ReplicaEventMessage(String nodeId, String nodeIPAddress, ClusterEventType event) { + this.nodeId = nodeId; + this.nodeIPAddress = nodeIPAddress; + this.event = event; + } + + public String getNodeId() { + return nodeId; + } + + public ClusterEventType getEvent() { + return event; + } + + public String getNodeIPAddress() { + return nodeIPAddress; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException { + NodeControllerService ncs = (NodeControllerService) cs; + IAsterixAppRuntimeContext appContext = + (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + Node node = new Node(); + node.setId(nodeId); + node.setClusterIp(nodeIPAddress); + Replica replica = new Replica(node); + appContext.getReplicationManager().reportReplicaEvent(new ReplicaEvent(replica, event)); + } + + @Override + public String type() { + return "REPLICA_EVENT"; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java new file mode 100644 index 0000000..1604e29 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.AbstractApplicationMessage; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.asterix.common.transactions.IAsterixResourceIdManager; +import org.apache.asterix.runtime.util.AsterixAppContextInfo; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class ReportMaxResourceIdMessage extends AbstractApplicationMessage { + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName()); + private final long maxResourceId; + private final String src; + + public ReportMaxResourceIdMessage(String src, long maxResourceId) { + this.src = src; + this.maxResourceId = maxResourceId; + } + + public long getMaxResourceId() { + return maxResourceId; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException { + IAsterixResourceIdManager resourceIdManager = + AsterixAppContextInfo.INSTANCE.getResourceIdManager(); + resourceIdManager.report(src, maxResourceId); + } + + public static void send(NodeControllerService cs) throws HyracksDataException { + NodeControllerService ncs = cs; + IAsterixAppRuntimeContext appContext = + (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); + long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(), + MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); + ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId); + try { + ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg, null); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e); + throw ExceptionUtils.convertToHyracksDataException(e); + } + } + + @Override + public String type() { + return "REPORT_MAX_RESOURCE_ID_RESPONSE"; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java new file mode 100644 index 0000000..3e2becf --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import org.apache.asterix.common.messaging.AbstractApplicationMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage { + private static final long serialVersionUID = 1L; + + @Override + public void handle(IControllerService cs) throws HyracksDataException { + ReportMaxResourceIdMessage.send((NodeControllerService) cs); + } + + @Override + public String type() { + return "REPORT_MAX_RESOURCE_ID_REQUEST"; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java new file mode 100644 index 0000000..5e7d808 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import java.util.Set; + +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.messaging.AbstractApplicationMessage; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.transactions.IAsterixResourceIdManager; +import org.apache.asterix.runtime.util.AsterixAppContextInfo; +import org.apache.asterix.runtime.util.AsterixClusterProperties; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +public class ResourceIdRequestMessage extends AbstractApplicationMessage { + private static final long serialVersionUID = 1L; + private final String src; + + public ResourceIdRequestMessage(String src) { + this.src = src; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException { + try { + ICCMessageBroker broker = + (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker(); + ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage(); + reponse.setId(id); + if (!AsterixClusterProperties.INSTANCE.isClusterActive()) { + reponse.setResourceId(-1); + reponse.setException(new Exception("Cannot generate global resource id when cluster is not active.")); + } else { + IAsterixResourceIdManager resourceIdManager = + AsterixAppContextInfo.INSTANCE.getResourceIdManager(); + reponse.setResourceId(resourceIdManager.createResourceId()); + if (reponse.getResourceId() < 0) { + reponse.setException(new Exception("One or more nodes has not reported max resource id.")); + } + requestMaxResourceID(resourceIdManager, broker); + } + broker.sendApplicationMessageToNC(reponse, src); + } catch (Exception e) { + throw ExceptionUtils.convertToHyracksDataException(e); + } + } + + private void requestMaxResourceID(IAsterixResourceIdManager resourceIdManager, ICCMessageBroker broker) + throws Exception { + Set getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes(); + ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage(); + msg.setId(ICCMessageBroker.NO_CALLBACK_MESSAGE_ID); + for (String nodeId : getParticipantNodes) { + if (!resourceIdManager.reported(nodeId)) { + broker.sendApplicationMessageToNC(msg, nodeId); + } + } + } + + @Override + public String type() { + return "RESOURCE_ID_REQUEST"; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java new file mode 100644 index 0000000..62c2163 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.message; + +import org.apache.asterix.common.messaging.AbstractApplicationMessage; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage { + private static final long serialVersionUID = 1L; + + private long resourceId; + private Exception exception; + + public long getResourceId() { + return resourceId; + } + + public void setResourceId(long resourceId) { + this.resourceId = resourceId; + } + + public Exception getException() { + return exception; + } + + public void setException(Exception exception) { + this.exception = exception; + } + + @Override + public void handle(IControllerService cs) throws HyracksDataException { + // Do nothing. for this message, the callback handles it, we probably should get rid of callbacks and + // instead, use the handle in the response to perform callback action + } + + @Override + public String type() { + return "RESOURCE_ID_RESPONSE"; + } +}