airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject git commit: fixing the thread creation in airavata-api
Date Fri, 24 Oct 2014 14:25:02 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 1d83a48b7 -> e5131ad87


fixing the thread creation in airavata-api


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e5131ad8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e5131ad8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e5131ad8

Branch: refs/heads/master
Commit: e5131ad8780045412660b5f4a9fa59848998e1fc
Parents: 1d83a48
Author: lahiru <lahiru@apache.org>
Authored: Fri Oct 24 02:55:48 2014 -0400
Committer: lahiru <lahiru@apache.org>
Committed: Fri Oct 24 02:55:48 2014 -0400

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   | 129 +++++++++++--------
 .../main/resources/airavata-server.properties   |   1 +
 .../monitor/impl/pull/qstat/HPCPullMonitor.java |   4 +-
 3 files changed, 78 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e5131ad8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 76100ff..5753e3e 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -27,6 +27,7 @@ import org.apache.aiaravata.application.catalog.data.resources.*;
 import org.apache.aiaravata.application.catalog.data.util.AppCatalogThriftConversion;
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.api.airavataAPIConstants;
+import org.apache.airavata.api.server.util.AiravataServerThreadPoolExecutor;
 import org.apache.airavata.api.server.util.DataModelUtils;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -1165,7 +1166,7 @@ public class AiravataServerHandler implements Airavata.Iface {
                 logger.debugId(airavataExperimentId, "Launching single application experiment
{}.", airavataExperimentId);
                 final OrchestratorService.Client orchestratorClient = getOrchestratorClient();
                 if (orchestratorClient.validateExperiment(expID)) {
-                   launchSingleAppExperiment(expID, token, orchestratorClient);
+                   AiravataServerThreadPoolExecutor.getFixedThreadPool().execute(new SingleAppExperimentRunner(expID,
token, orchestratorClient));
                 } else {
                     logger.errorId(airavataExperimentId, "Experiment validation failed. Please
check the configurations.");
                     throw new InvalidRequestException("Experiment Validation Failed, please
check the configuration");
@@ -1199,66 +1200,86 @@ public class AiravataServerHandler implements Airavata.Iface {
             logger.errorId(experimentId, "Error while launching experiment.", e);
         }
     }
-    
-    private boolean launchSingleAppExperiment(String experimentId, String airavataCredStoreToken,
OrchestratorService.Client orchestratorClient) throws TException {
-        Experiment experiment = null;
-        try {
-            List<String> ids = registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL,
WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
-            for (String workflowNodeId : ids) {
+
+    private class SingleAppExperimentRunner implements Runnable {
+
+        String experimentId;
+        String airavataCredStoreToken;
+        Client client;
+        public SingleAppExperimentRunner(String experimentId,String airavataCredStoreToken,Client
client){
+            this.experimentId = experimentId;
+            this.airavataCredStoreToken = airavataCredStoreToken;
+            this.client = client;
+        }
+        @Override
+        public void run() {
+            try {
+                launchSingleAppExperiment();
+            } catch (TException e) {
+                e.printStackTrace();
+            }
+        }
+
+        private boolean launchSingleAppExperiment() throws TException {
+            Experiment experiment = null;
+            try {
+                List<String> ids = registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL,
WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
+                for (String workflowNodeId : ids) {
 //                WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL,
workflowNodeId);
-                List<Object> taskDetailList = registry.get(RegistryModelType.TASK_DETAIL,
TaskDetailConstants.NODE_ID, workflowNodeId);
-                for (Object o : taskDetailList) {
-                    TaskDetails taskData = (TaskDetails) o;
-                    //iterate through all the generated tasks and performs the job submisssion+monitoring
-                    experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT,
experimentId);
-                    if (experiment == null) {
-                        logger.errorId(experimentId, "Error retrieving the Experiment by
the given experimentID: {}", experimentId);
-                        return false;
-                    }
-                    ExperimentStatus status = new ExperimentStatus();
-                    status.setExperimentState(ExperimentState.LAUNCHED);
-                    status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-                    experiment.setExperimentStatus(status);
-                    registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
-                    if (ServerSettings.isRabbitMqPublishEnabled()){
-                        String gatewayId = ServerSettings.getDefaultUserGateway();
-                        ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED,
-                                experimentId,
-                                gatewayId);
-                        String messageId = AiravataUtils.getId("EXPERIMENT");
-                        MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT,messageId,gatewayId);
-                        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-                        publisher.publish(messageContext);
+                    List<Object> taskDetailList = registry.get(RegistryModelType.TASK_DETAIL,
TaskDetailConstants.NODE_ID, workflowNodeId);
+                    for (Object o : taskDetailList) {
+                        TaskDetails taskData = (TaskDetails) o;
+                        //iterate through all the generated tasks and performs the job submisssion+monitoring
+                        experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT,
experimentId);
+                        if (experiment == null) {
+                            logger.errorId(experimentId, "Error retrieving the Experiment
by the given experimentID: {}", experimentId);
+                            return false;
+                        }
+                        ExperimentStatus status = new ExperimentStatus();
+                        status.setExperimentState(ExperimentState.LAUNCHED);
+                        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+                        experiment.setExperimentStatus(status);
+                        registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
+                        if (ServerSettings.isRabbitMqPublishEnabled()) {
+                            String gatewayId = ServerSettings.getDefaultUserGateway();
+                            ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED,
+                                    experimentId,
+                                    gatewayId);
+                            String messageId = AiravataUtils.getId("EXPERIMENT");
+                            MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT,
messageId, gatewayId);
+                            messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+                            publisher.publish(messageContext);
+                        }
+                        registry.update(RegistryModelType.TASK_DETAIL, taskData, taskData.getTaskID());
+                        //launching the experiment
+                        client.launchTask(taskData.getTaskID(), airavataCredStoreToken);
                     }
-                    registry.update(RegistryModelType.TASK_DETAIL, taskData, taskData.getTaskID());
-                    //launching the experiment
-                    orchestratorClient.launchTask(taskData.getTaskID(), airavataCredStoreToken);
                 }
-            }
 
-        } catch (Exception e) {
-            // Here we really do not have to do much because only potential failure can happen
-            // is in gfac, if there are errors in gfac, it will handle the experiment/task/job
statuses
-            // We might get failures in registry access before submitting the jobs to gfac,
in that case we
-            // leave the status of these as created.
-            ExperimentStatus status = new ExperimentStatus();
-            status.setExperimentState(ExperimentState.FAILED);
-            status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-            experiment.setExperimentStatus(status);
-            try {
-                registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
-            } catch (RegistryException e1) {
-                logger.errorId(experimentId, "Error while updating experiment status to "
+ status.toString(), e);
+            } catch (Exception e) {
+                // Here we really do not have to do much because only potential failure can
happen
+                // is in gfac, if there are errors in gfac, it will handle the experiment/task/job
statuses
+                // We might get failures in registry access before submitting the jobs to
gfac, in that case we
+                // leave the status of these as created.
+                ExperimentStatus status = new ExperimentStatus();
+                status.setExperimentState(ExperimentState.FAILED);
+                status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+                experiment.setExperimentStatus(status);
+                try {
+                    registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
+                } catch (RegistryException e1) {
+                    logger.errorId(experimentId, "Error while updating experiment status
to " + status.toString(), e);
+                    throw new TException(e);
+                }
+                logger.errorId(experimentId, "Error while updating task status, hence updated
experiment status to " + status.toString(), e);
                 throw new TException(e);
-            }
-            logger.errorId(experimentId, "Error while updating task status, hence updated
experiment status to " + status.toString(), e);
-            throw new TException(e);
-        }finally {
-            orchestratorClient.getOutputProtocol().getTransport().close();
-            orchestratorClient.getInputProtocol().getTransport().close();
+            } finally {
+                client.getOutputProtocol().getTransport().close();
+                client.getInputProtocol().getTransport().close();
 
+            }
+            return true;
         }
-        return true;
     }
     
 	private OrchestratorService.Client getOrchestratorClient() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e5131ad8/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 978369e..edb60e2 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -113,6 +113,7 @@ email.from=airavata@apache.org
 #  to interact with Computational Resources.
 #
 gfac.thread.pool.size=50
+airavata.server.thread.pool.size=50
 gfac=org.apache.airavata.gfac.server.GfacServer
 myproxy.server=myproxy.teragrid.org
 myproxy.username=ogce

http://git-wip-us.apache.org/repos/asf/airavata/blob/e5131ad8/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index e165bfd..d3c3df8 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -295,8 +295,8 @@ public class HPCPullMonitor extends PullMonitor {
             for (String jobName: keys) {
                 MonitorID completedJob = completedJobs.get(jobName);
                 CommonUtils.removeMonitorFromQueue(queue, completedJob);
-                    gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext());
-//                  GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac,
completedJob, publisher));
+//                    gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext());
+                  GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac,
completedJob, publisher));
                 if (zk == null) {
                     zk = completedJob.getJobExecutionContext().getZk();
                 }


Mime
View raw message