airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From scnakand...@apache.org
Subject [04/16] airavata git commit: Fixed zookeeper path issue with delivery tag acknowledgement
Date Sat, 05 Sep 2015 07:05:52 GMT
Fixed zookeeper path issue with delivery tag acknowledgement


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/0cb9f312
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/0cb9f312
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/0cb9f312

Branch: refs/heads/master
Commit: 0cb9f312476023fd17f628d232e385dfb5123e96
Parents: 1675d74
Author: Shameera Rathanyaka <shameerainfo@gmail.com>
Authored: Fri Sep 4 11:47:02 2015 -0400
Committer: Supun Nakandala <scnakandala@apache.org>
Committed: Sat Sep 5 12:24:22 2015 +0530

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacUtils.java    | 150 ++-----------------
 .../apache/airavata/gfac/impl/GFacWorker.java   |   3 +-
 .../airavata/gfac/server/GfacServerHandler.java |   5 +-
 3 files changed, 16 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/0cb9f312/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index d3d4c7e..1870255 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -541,32 +541,7 @@ public class GFacUtils {
         }
     }
 
-    /**
-     * This will return a value if the server is down because we iterate through exisiting
experiment nodes, not
-     * through gfac-server nodes
-     *
-     * @param experimentID
-     * @param curatorClient
-     * @return
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient)
throws Exception {
-        String experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE;
-        List<String> children = curatorClient.getChildren().forPath(experimentNode);
-        for (String pickedChild : children) {
-            String experimentPath = experimentNode + File.separator + pickedChild;
-            String newExpNode = experimentPath + File.separator + experimentID;
-            Stat exists = curatorClient.checkExists().forPath(newExpNode);
-            if (exists == null) {
-                continue;
-            } else {
-                return newExpNode;
-            }
-        }
-        return null;
-    }
-
+	// Fixme - remove this method. with new changes we don't need to use this method.
     public static boolean setExperimentCancelRequest(String processId, CuratorFramework curatorClient,
long
 		    deliveryTag) throws Exception {
 	    String experimentNode = ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
@@ -576,56 +551,6 @@ public class GFacUtils {
 	    return true;
     }
 
-    public static boolean isCancelled(String experimentID, CuratorFramework curatorClient)
throws Exception {
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient);
-        if (experimentEntry == null) {
-            return false;
-        } else {
-            Stat exists = curatorClient.checkExists().forPath(experimentEntry);
-            if (exists != null) {
-                String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry
+ File.separator + "operation"));
-                if ("cancel".equals(operation)) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-//    public static void saveHandlerData(JobExecutionContext jobExecutionContext,
-//                                       StringBuffer data, String className) throws GFacHandlerException
{
-//		try {
-//			CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
-//			if (curatorClient != null) {
-//				String expZnodeHandlerPath = AiravataZKUtils
-//						.getExpZnodeHandlerPath(
-//								jobExecutionContext.getExperimentID(),
-//								className);
-//				Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
-//                if (exists != null) {
-//					curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath,
data.toString().getBytes());
-//				} else {
-//                    log.error("Saving Handler data failed, Stat is null");
-//                }
-//            }
-//		} catch (Exception e) {
-//			throw new GFacHandlerException(e);
-//		}
-//	}
-
-//    public static String getHandlerData(ProcessContext processContext, String className)
throws Exception {
-//        CuratorFramework curatorClient = processContext.getCuratorClient();
-//        if (curatorClient != null) {
-//            String expZnodeHandlerPath = AiravataZKUtils
-//                    .getExpZnodeHandlerPath(
-//                            processContext.getExperimentID(),
-//                            className);
-//            Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
-//            return new String(processContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
-//        }
-//        return null;
-//    }
-
     public static CredentialReader getCredentialReader()
             throws ApplicationSettingsException, IllegalAccessException,
             InstantiationException {
@@ -712,62 +637,6 @@ public class GFacUtils {
         return buffer.getLong();
     }
 
-    public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState
state) throws RegistryException {
-        ExperimentCatalog airavataExperimentCatalog = RegistryFactory.getDefaultExpCatalog();
-        ExperimentModel details = (ExperimentModel) airavataExperimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT,
experimentId);
-        if (details == null) {
-            details = new ExperimentModel();
-            details.setExperimentId(experimentId);
-        }
-        ExperimentStatus status = new ExperimentStatus();
-        status.setState(state);
-        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-        if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getState()) &&
-                !ExperimentState.CANCELING.equals(details.getExperimentStatus().getState()))
{
-            status.setState(state);
-        } else {
-            status.setState(details.getExperimentStatus().getState());
-        }
-        details.setExperimentStatus(status);
-        log.info("Updating the experiment status of experiment: " + experimentId + " to "
+ status.getState().toString());
-        airavataExperimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status,
experimentId);
-        return details.getExperimentStatus().getState();
-    }
-
-//    public static boolean isFailedJob(JobExecutionContext jec) {
-////        JobStatus jobStatus = jec.getJobDetails().getJobStatus();
-////        if (jobStatus.getJobState() == JobState.FAILED) {
-////            return true;
-////        }
-//        return false;
-//    }
-
-    public static boolean ackCancelRequest(String experimentId, CuratorFramework curatorClient)
throws Exception {
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
-        String cancelNodePath = experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
-        if (experimentEntry == null) {
-            // This should be handle in validation request. Gfac shouldn't get any invalidate
experiment.
-            log.error("Cannot find the experiment Entry, so cancel operation cannot be performed.
" +
-                    "This happen when experiment completed and already removed from the CuratorFramework");
-        } else {
-            // check cancel operation is being processed for the same experiment.
-            Stat cancelState = curatorClient.checkExists().forPath(cancelNodePath);
-            if (cancelState != null) {
-                ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
cancelNodePath, true);
-                return true;
-            }
-        }
-        return false;
-    }
-
-//    public static void publishTaskStatus (JobExecutionContext jobExecutionContext, LocalEventPublisher
publisher, TaskStatus state){
-//        TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-//                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-//                jobExecutionContext.getExperimentID(),
-//                jobExecutionContext.getGatewayID());
-//        publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
-//    }
-
     public static String getZKGfacServersParentPath() {
         return ZKPaths.makePath(ZkConstants.ZOOKEEPER_SERVERS_NODE, ZkConstants.ZOOKEEPER_GFAC_SERVER_NODE);
     }
@@ -1114,14 +983,19 @@ public class GFacUtils {
     }
 
 	public static String getExperimentNodePath(String experimentId) {
-		return ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId;
+		return ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId);
 	}
 
-	public static long getProcessDeliveryTag(CuratorFramework curatorClient, String processId)
throws Exception {
-		String deliveryTagPath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + ZkConstants
-				.ZOOKEEPER_DELIVERYTAG_NODE;
-		byte[] bytes = curatorClient.getData().forPath(deliveryTagPath);
-		return GFacUtils.bytesToLong(bytes);
+	public static long getProcessDeliveryTag(CuratorFramework curatorClient, String experimentId,
String processId) throws Exception {
+		String deliveryTagPath = ZKPaths.makePath(ZKPaths.makePath(getExperimentNodePath(experimentId),
processId),
+				ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+		Stat stat = curatorClient.checkExists().forPath(deliveryTagPath);
+		if (stat != null) {
+			byte[] bytes = curatorClient.getData().forPath(deliveryTagPath);
+			return GFacUtils.bytesToLong(bytes);
+		} else {
+			throw new GFacException("Couldn't fine the deliveryTag path: " + deliveryTagPath);
+		}
 	}
 
 	public static void saveJobModel(ProcessContext processContext, JobModel jobModel) throws
GFacException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/0cb9f312/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 49dc45d..cdbca05 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -182,7 +182,8 @@ public class GFacWorker implements Runnable {
 
 	private void sendAck() {
 		try {
-			long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
processId);
+			long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
+					processContext.getExperimentId(), processId);
 			Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
 			log.info("expId: {}, procesId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(),
 					processId, processDeliveryTag);

http://git-wip-us.apache.org/repos/asf/airavata/blob/0cb9f312/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 1040b05..8427cdc 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -314,7 +314,7 @@ public class GfacServerHandler implements GfacService.Iface {
 		long deliveryTag = messageContext.getDeliveryTag();
 
 		// create /experiments//{experimentId}{processId} node and set data - serverName, add redelivery
listener
-		String experimentNodePath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + experimentId;
+		String experimentNodePath = GFacUtils.getExperimentNodePath(experimentId);
 		String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, processId);
 		ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath);
 		curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes());
@@ -347,8 +347,7 @@ public class GfacServerHandler implements GfacService.Iface {
 		String experimentId = event.getExperimentId();
 		String processId = event.getProcessId();
 		long deliveryTag = messageContext.getDeliveryTag();
-		String processNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
-				experimentId), processId);
+		String processNodePath = ZKPaths.makePath(GFacUtils.getExperimentNodePath(experimentId),
processId);
 		Stat stat = curatorClient.checkExists().forPath(processNodePath);
 		if (stat != null) {
 			// create /experiments/{processId}/deliveryTag node and set data - deliveryTag


Mime
View raw message