airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dimuthu...@apache.org
Subject [airavata] branch develop updated: Refactoring Helix Task Context to support group resource profile integration
Date Fri, 14 Sep 2018 20:18:43 GMT
This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new c377ff2  Refactoring Helix Task Context to support group resource profile integration
c377ff2 is described below

commit c377ff28f27934cfb85fef088fdad863324cd660
Author: Dimuthu Wannipurage <dimuthu.wannipurage@datasprouts.com>
AuthorDate: Fri Sep 14 16:18:34 2018 -0400

    Refactoring Helix Task Context to support group resource profile integration
---
 .../airavata/helix/impl/task/AiravataTask.java     |  11 +-
 .../airavata/helix/impl/task/TaskContext.java      | 298 ++++++++++++---------
 .../helix/impl/task/staging/DataStagingTask.java   |   3 +-
 3 files changed, 175 insertions(+), 137 deletions(-)

diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index d2e03d8..ebf6784 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -253,7 +253,7 @@ public abstract class AiravataTask extends AbstractTask {
                         dataProductModel.setDataProductType(DataProductType.FILE);
 
                         DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
-                        replicaLocationModel.setStorageResourceId(getTaskContext().getStorageResource().getStorageResourceId());
+                        replicaLocationModel.setStorageResourceId(getTaskContext().getStorageResourceDescription().getStorageResourceId());
                         replicaLocationModel.setReplicaName(outputName + " gateway data store
copy");
                         replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
                         replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
@@ -383,14 +383,7 @@ public abstract class AiravataTask extends AbstractTask {
             TaskContext.TaskContextBuilder taskContextBuilder = new TaskContext.TaskContextBuilder(getProcessId(),
getGatewayId(), getTaskId())
                     .setRegistryClient(getRegistryServiceClient())
                     .setProcessModel(getProcessModel())
-                    .setStatusPublisher(getStatusPublisher())
-                    .setGatewayResourceProfile(getRegistryServiceClient().getGatewayResourceProfile(gatewayId))
-                    .setGatewayComputeResourcePreference(
-                            getRegistryServiceClient().getGatewayComputeResourcePreference(gatewayId,
-                                    processModel.getComputeResourceId()))
-                    .setGatewayStorageResourcePreference(
-                            getRegistryServiceClient().getGatewayStoragePreference(gatewayId,
-                                    processModel.getStorageResourceId()));
+                    .setStatusPublisher(getStatusPublisher());
 
             this.taskContext = taskContextBuilder.build();
             logger.info("Task " + this.taskName + " initialized");
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 0bafed4..a94e7f4 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -28,6 +28,8 @@ import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
 import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupComputeResourcePreference;
+import org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile;
 import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference;
 import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile;
@@ -44,10 +46,6 @@ import org.apache.airavata.model.status.TaskState;
 import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskModel;
 import org.apache.airavata.registry.api.RegistryService;
-import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.airavata.registry.cpi.ExperimentCatalog;
-import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,70 +53,71 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.*;
 
+/**
+ * Note: process context property use lazy loading approach. In runtime you will see some
properties as null
+ * unless you have access it previously. Once that property access using the api,it will
be set to correct value.
+ */
 public class TaskContext {
 
     private final static Logger logger = LoggerFactory.getLogger(TaskContext.class);
-    // process model
+
     private Publisher statusPublisher;
-    private final String processId;
-    private final String gatewayId;
-    //private final String tokenId;
+    private RegistryService.Client registryClient;
+
+    private String processId;
+    private String gatewayId;
+    private String taskId;
+
     private ProcessModel processModel;
+    private JobModel jobModel;
+    private Object subTaskModel = null;
+
     private String workingDir;
     private String scratchLocation;
     private String inputDir;
     private String outputDir;
-    private String localWorkingDir;
+    private String stdoutLocation;
+    private String stderrLocation;
+
     private GatewayResourceProfile gatewayResourceProfile;
+    private UserResourceProfile userResourceProfile;
+    private GroupResourceProfile groupResourceProfile;
+
     private ComputeResourcePreference gatewayComputeResourcePreference;
     private StoragePreference gatewayStorageResourcePreference;
-    private UserResourceProfile userResourceProfile;
     private UserComputeResourcePreference userComputeResourcePreference;
     private UserStoragePreference userStoragePreference;
+    private GroupComputeResourcePreference groupComputeResourcePreference;
+
     private ComputeResourceDescription computeResourceDescription;
     private ApplicationDeploymentDescription applicationDeploymentDescription;
     private ApplicationInterfaceDescription applicationInterfaceDescription;
-    private Map<String, String> sshProperties;
-    private String stdoutLocation;
-    private String stderrLocation;
+    private StorageResourceDescription storageResourceDescription;
+
     private JobSubmissionProtocol jobSubmissionProtocol;
     private DataMovementProtocol dataMovementProtocol;
-    private JobModel jobModel;
-    private StorageResourceDescription storageResource;
-    private MonitorMode monitorMode;
     private ResourceJobManager resourceJobManager;
-    private boolean handOver;
-    private boolean cancel;
+
     private List<String> taskExecutionOrder;
     private List<TaskModel> taskList;
     private Map<String, TaskModel> taskMap;
-    private boolean pauseTaskExecution = false;  // Task can pause task execution by setting
this value
-    private boolean complete = false; // all tasks executed?
-    private boolean recovery = false; // is process in recovery mode?
-    private TaskModel currentExecutingTaskModel; // current execution task model in case
we pause process execution we need this to continue process exectuion again
-    private boolean acknowledge;
-    private boolean recoveryWithCancel = false;
-    private String usageReportingGatewayId;
-    private List<String> queueSpecificMacros;
-    private String taskId;
-    private Object subTaskModel = null;
-    private RegistryService.Client registryClient;
 
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
+    }
 
-    /**
-     * Note: process context property use lazy loading approach. In runtime you will see
some properties as null
-     * unless you have access it previously. Once that property access using the api,it will
be set to correct value.
-     */
-    private TaskContext(String processId, String gatewayId, String taskId) {
-        this.processId = processId;
+    public void setGatewayId(String gatewayId) {
         this.gatewayId = gatewayId;
-        this.taskId = taskId;
     }
 
     public String getGatewayId() {
         return gatewayId;
     }
 
+    public void setProcessId(String processId) {
+        this.processId = processId;
+    }
+
     public String getProcessId() {
         return processId;
     }
@@ -160,13 +159,17 @@ public class TaskContext {
                 scratchLocation = userComputeResourcePreference.getScratchLocation();
             } else if (isValid(processModel.getProcessResourceSchedule().getOverrideScratchLocation()))
{
                 scratchLocation = processModel.getProcessResourceSchedule().getOverrideScratchLocation();
-            }else {
+            } else if (isSetGroupResourceProfile() && groupComputeResourcePreference
!= null &&
+                    isValid(groupComputeResourcePreference.getScratchLocation())) {
+                scratchLocation = groupComputeResourcePreference.getScratchLocation();
+            } else {
                 scratchLocation = gatewayComputeResourcePreference.getScratchLocation();
             }
         }
         return scratchLocation;
     }
 
+
     public void setWorkingDir(String workingDir) {
         this.workingDir = workingDir;
     }
@@ -179,6 +182,22 @@ public class TaskContext {
         this.gatewayResourceProfile = gatewayResourceProfile;
     }
 
+    public GroupResourceProfile getGroupResourceProfile() {
+        return groupResourceProfile;
+    }
+
+    public void setGroupResourceProfile(GroupResourceProfile groupResourceProfile) {
+        this.groupResourceProfile = groupResourceProfile;
+    }
+
+    public GroupComputeResourcePreference getGroupComputeResourcePreference() {
+        return groupComputeResourcePreference;
+    }
+
+    public void setGroupComputeResourcePreference(GroupComputeResourcePreference groupComputeResourcePreference)
{
+        this.groupComputeResourcePreference = groupComputeResourcePreference;
+    }
+
     public UserResourceProfile getUserResourceProfile() {
         return userResourceProfile;
     }
@@ -211,15 +230,6 @@ public class TaskContext {
         this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
     }
 
-
-    public Map<String, String> getSshProperties() {
-        return sshProperties;
-    }
-
-    public void setSshProperties(Map<String, String> sshProperties) {
-        this.sshProperties = sshProperties;
-    }
-
     public ComputeResourceDescription getComputeResourceDescription() {
         return computeResourceDescription;
     }
@@ -285,7 +295,11 @@ public class TaskContext {
 
     public JobSubmissionProtocol getJobSubmissionProtocol() {
         if (jobSubmissionProtocol == null) {
-            jobSubmissionProtocol = gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+            if (isSetGroupResourceProfile() && groupComputeResourcePreference !=
null) {
+                jobSubmissionProtocol = groupComputeResourcePreference.getPreferredJobSubmissionProtocol();
+            } else {
+                jobSubmissionProtocol = gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+            }
         }
         return jobSubmissionProtocol;
     }
@@ -296,7 +310,11 @@ public class TaskContext {
 
     public DataMovementProtocol getDataMovementProtocol() {
         if (dataMovementProtocol == null) {
-            dataMovementProtocol = gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+            if (isSetGroupResourceProfile() && groupComputeResourcePreference !=
null) {
+                dataMovementProtocol = groupComputeResourcePreference.getPreferredDataMovementProtocol();
+            } else {
+                dataMovementProtocol = gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+            }
         }
         return dataMovementProtocol;
     }
@@ -409,7 +427,7 @@ public class TaskContext {
                 isValid(userComputeResourcePreference.getComputeResourceId())) {
             return userComputeResourcePreference.getComputeResourceId();
         } else {
-            return gatewayComputeResourcePreference.getComputeResourceId();
+            return groupComputeResourcePreference.getComputeResourceId();
         }
     }
 
@@ -421,12 +439,13 @@ public class TaskContext {
             } else {
                 return userResourceProfile.getCredentialStoreToken();
             }
+        }  else if (isSetGroupResourceProfile() &&
+                groupComputeResourcePreference != null &&
+                isValid(groupComputeResourcePreference.getResourceSpecificCredentialStoreToken()))
{
+            return groupComputeResourcePreference.getResourceSpecificCredentialStoreToken();
         } else {
-            if (isValid(gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken()))
{
-                return gatewayComputeResourcePreference.getResourceSpecificCredentialStoreToken();
-            } else {
-                return gatewayResourceProfile.getCredentialStoreToken();
-            }
+            // FIXME: fallback to credential store token on GroupResourceProfile (see AIRAVATA-2865)
+            return gatewayResourceProfile.getCredentialStoreToken();
         }
     }
 
@@ -439,19 +458,23 @@ public class TaskContext {
     }
 
     public JobSubmissionProtocol getPreferredJobSubmissionProtocol(){
-        return gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+        if (isSetGroupResourceProfile() &&
+                groupComputeResourcePreference != null &&
+                groupComputeResourcePreference.getPreferredJobSubmissionProtocol() != null)
{
+            return groupComputeResourcePreference.getPreferredJobSubmissionProtocol();
+        } else {
+            return gatewayComputeResourcePreference.getPreferredJobSubmissionProtocol();
+        }
     }
 
     public DataMovementProtocol getPreferredDataMovementProtocol() {
-        return gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
-    }
-
-    public void setMonitorMode(MonitorMode monitorMode) {
-        this.monitorMode = monitorMode;
-    }
-
-    public MonitorMode getMonitorMode() {
-        return monitorMode;
+        if (isSetGroupResourceProfile() &&
+                groupComputeResourcePreference != null &&
+                groupComputeResourcePreference.getPreferredDataMovementProtocol() != null)
{
+            return groupComputeResourcePreference.getPreferredDataMovementProtocol();
+        } else {
+            return gatewayComputeResourcePreference.getPreferredDataMovementProtocol();
+        }
     }
 
     public void setResourceJobManager(ResourceJobManager resourceJobManager) {
@@ -500,34 +523,22 @@ public class TaskContext {
         return processModel.getExperimentId();
     }
 
-    public StorageResourceDescription getStorageResource() {
-        return storageResource;
-    }
-
-    public void setStorageResource(StorageResourceDescription storageResource) {
-        this.storageResource = storageResource;
-    }
-
-    public void setAcknowledge(boolean acknowledge) {
-        this.acknowledge = acknowledge;
-    }
-
-    public boolean isAcknowledge() {
-        return acknowledge;
-    }
-
-    public boolean isRecoveryWithCancel() {
-        return recoveryWithCancel;
+    public StorageResourceDescription getStorageResourceDescription() {
+        return storageResourceDescription;
     }
 
-    public void setRecoveryWithCancel(boolean recoveryWithCancel) {
-        this.recoveryWithCancel = recoveryWithCancel;
+    public void setStorageResourceDescription(StorageResourceDescription storageResourceDescription)
{
+        this.storageResourceDescription = storageResourceDescription;
     }
 
     public boolean isUseUserCRPref() {
         return getProcessModel().isUseUserCRPref();
     }
 
+    public boolean isSetGroupResourceProfile() {
+        return getProcessModel().isSetGroupResourceProfileId();
+    }
+
     public String getComputeResourceLoginUserName(){
         if (isUseUserCRPref() &&
                 userComputeResourcePreference != null &&
@@ -535,6 +546,10 @@ public class TaskContext {
             return userComputeResourcePreference.getLoginUserName();
         } else if (isValid(processModel.getProcessResourceSchedule().getOverrideLoginUserName()))
{
             return processModel.getProcessResourceSchedule().getOverrideLoginUserName();
+        } else if (isSetGroupResourceProfile() &&
+                groupComputeResourcePreference != null &&
+                isValid(groupComputeResourcePreference.getLoginUserName())){
+            return groupComputeResourcePreference.getLoginUserName();
         } else {
             return gatewayComputeResourcePreference.getLoginUserName();
         }
@@ -573,11 +588,28 @@ public class TaskContext {
     }
 
     public String getUsageReportingGatewayId() {
-        return gatewayComputeResourcePreference.getUsageReportingGatewayId();
+        // FIXME: Should GroupResourceProfiles be able to specify a different usage reporting
gateway id?
+	    /* if (isSetGroupResourceProfile() &&
+                groupComputeResourcePreference != null &&
+                isValid(groupComputeResourcePreference.getUsageReportingGatewayId())) {
+	        return groupComputeResourcePreference.getUsageReportingGatewayId();
+        }*/
+	    return gatewayComputeResourcePreference.getUsageReportingGatewayId();
+
     }
 
     public String getAllocationProjectNumber() {
-        return gatewayComputeResourcePreference.getAllocationProjectNumber();
+        if (isUseUserCRPref() &&
+                userComputeResourcePreference != null &&
+                userComputeResourcePreference.getAllocationProjectNumber() != null) {
+            return userComputeResourcePreference.getAllocationProjectNumber();
+        } else if (isSetGroupResourceProfile() &&
+                groupComputeResourcePreference != null &&
+                isValid(groupComputeResourcePreference.getAllocationProjectNumber())){
+            return groupComputeResourcePreference.getAllocationProjectNumber();
+        } else {
+            return null;
+        }
     }
 
     public String getReservation() {
@@ -590,9 +622,9 @@ public class TaskContext {
             start = userComputeResourcePreference.getReservationStartTime();
             end = userComputeResourcePreference.getReservationEndTime();
         } else {
-            reservation = gatewayComputeResourcePreference.getReservation();
-            start = gatewayComputeResourcePreference.getReservationStartTime();
-            end = gatewayComputeResourcePreference.getReservationEndTime();
+            reservation = groupComputeResourcePreference.getReservation();
+            start = groupComputeResourcePreference.getReservationStartTime();
+            end = groupComputeResourcePreference.getReservationEndTime();
         }
         if (reservation != null && start > 0 && start < end) {
             long now = Calendar.getInstance().getTimeInMillis();
@@ -609,7 +641,7 @@ public class TaskContext {
                 isValid(userComputeResourcePreference.getQualityOfService())) {
             return userComputeResourcePreference.getQualityOfService();
         } else {
-            return gatewayComputeResourcePreference.getQualityOfService();
+            return groupComputeResourcePreference.getQualityOfService();
         }
     }
 
@@ -621,6 +653,10 @@ public class TaskContext {
             return userComputeResourcePreference.getPreferredBatchQueue();
         } else if (isValid(processModel.getProcessResourceSchedule().getQueueName())) {
             return processModel.getProcessResourceSchedule().getQueueName();
+        }  else if (isSetGroupResourceProfile() &&
+                groupComputeResourcePreference != null &&
+                isValid(groupComputeResourcePreference.getPreferredBatchQueue())){
+            return groupComputeResourcePreference.getPreferredBatchQueue();
         } else {
             return gatewayComputeResourcePreference.getPreferredBatchQueue();
         }
@@ -687,9 +723,6 @@ public class TaskContext {
         private final String taskId;
         private RegistryService.Client registryClient;
         private Publisher statusPublisher;
-        private GatewayResourceProfile gatewayResourceProfile;
-        private ComputeResourcePreference gatewayComputeResourcePreference;
-        private StoragePreference gatewayStorageResourcePreference;
         private ProcessModel processModel;
 
         @SuppressWarnings("WeakerAccess")
@@ -702,21 +735,6 @@ public class TaskContext {
             this.taskId = taskId;
         }
 
-        public TaskContextBuilder setGatewayResourceProfile(GatewayResourceProfile gatewayResourceProfile)
{
-            this.gatewayResourceProfile = gatewayResourceProfile;
-            return this;
-        }
-
-        public TaskContextBuilder setGatewayComputeResourcePreference(ComputeResourcePreference
gatewayComputeResourcePreference) {
-            this.gatewayComputeResourcePreference = gatewayComputeResourcePreference;
-            return this;
-        }
-
-        public TaskContextBuilder setGatewayStorageResourcePreference(StoragePreference gatewayStorageResourcePreference)
{
-            this.gatewayStorageResourcePreference = gatewayStorageResourcePreference;
-            return this;
-        }
-
         public TaskContextBuilder setProcessModel(ProcessModel processModel) {
             this.processModel = processModel;
             return this;
@@ -733,15 +751,7 @@ public class TaskContext {
         }
 
         public TaskContext build() throws Exception {
-            if (notValid(gatewayResourceProfile)) {
-                throwError("Invalid GatewayResourceProfile");
-            }
-            if (notValid(gatewayComputeResourcePreference)) {
-                throwError("Invalid Gateway ComputeResourcePreference");
-            }
-            if (notValid(gatewayStorageResourcePreference)) {
-                throwError("Invalid Gateway StoragePreference");
-            }
+
             if (notValid(processModel)) {
                 throwError("Invalid Process Model");
             }
@@ -752,17 +762,54 @@ public class TaskContext {
                 throwError("Invalid Status Publisher");
             }
 
-            TaskContext ctx = new TaskContext(processId, gatewayId, taskId);
+            TaskContext ctx = new TaskContext();
             ctx.setRegistryClient(registryClient);
             ctx.setStatusPublisher(statusPublisher);
             ctx.setProcessModel(processModel);
-            ctx.setGatewayResourceProfile(gatewayResourceProfile);
-            ctx.setGatewayComputeResourcePreference(gatewayComputeResourcePreference);
-            ctx.setGatewayStorageResourcePreference(gatewayStorageResourcePreference);
-            ctx.setApplicationDeploymentDescription(registryClient.getApplicationDeployment(processModel.getApplicationDeploymentId()));
-            ctx.setApplicationInterfaceDescription(registryClient.getApplicationInterface(processModel.getApplicationInterfaceId()));
-            ctx.setComputeResourceDescription(registryClient.getComputeResource(ctx.getComputeResourceId()));
-            ctx.setStorageResource(registryClient.getStorageResource(ctx.getStorageResourceId()));
+            ctx.setTaskId(taskId);
+            ctx.setGatewayId(gatewayId);
+            ctx.setProcessId(processId);
+
+            ctx.setGatewayComputeResourcePreference(
+                    Optional.ofNullable(registryClient.getGatewayComputeResourcePreference(
+                            gatewayId,
+                            processModel.getComputeResourceId()))
+                            .orElseThrow(() -> new Exception("Invalid Gateway ComputeResource
Preference")));
+
+            ctx.setGatewayResourceProfile(
+                    Optional.ofNullable(registryClient.getGatewayResourceProfile(gatewayId))
+                            .orElseThrow(() -> new Exception("Invalid GatewayResourceProfile")));
+
+            ctx.setGatewayStorageResourcePreference(
+                    Optional.ofNullable(registryClient.getGatewayStoragePreference(
+                            gatewayId,
+                            processModel.getStorageResourceId()))
+                            .orElseThrow(() -> new Exception("Invalid Gateway StoragePreference")));
+
+            ctx.setApplicationDeploymentDescription(
+                    Optional.ofNullable(registryClient.getApplicationDeployment(
+                            processModel.getApplicationDeploymentId()))
+                            .orElseThrow(() -> new Exception("Invalid Application Deployment")));
+
+            ctx.setApplicationInterfaceDescription(
+                    Optional.ofNullable(registryClient.getApplicationInterface(
+                            processModel.getApplicationInterfaceId()))
+                            .orElseThrow(() -> new Exception("Invalid Application Interface")));
+
+            ctx.setComputeResourceDescription(
+                    Optional.ofNullable(registryClient.getComputeResource(
+                            ctx.getComputeResourceId()))
+                            .orElseThrow(() -> new Exception("Invalid Compute Resource
Description")));
+
+            ctx.setStorageResourceDescription(
+                    Optional.ofNullable(registryClient.getStorageResource(
+                            ctx.getStorageResourceId()))
+                            .orElseThrow(() -> new Exception("Invalid Storage Resource
Description")));
+
+            ctx.setGroupResourceProfile(registryClient.getGroupResourceProfile(processModel.getGroupResourceProfileId()));
+
+            ctx.setGroupComputeResourcePreference(registryClient.getGroupComputeResourcePreference(processModel.getComputeResourceId(),
+                    processModel.getGroupResourceProfileId()));
 
             if (processModel.isUseUserCRPref()) {
                 ctx.setUserResourceProfile(registryClient.getUserResourceProfile(processModel.getUserName(),
gatewayId));
@@ -811,7 +858,6 @@ public class TaskContext {
         private void throwError(String msg) throws Exception {
             throw new Exception(msg);
         }
-
     }
 }
 
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
index fe57776..9d5db8d 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
@@ -28,7 +28,6 @@ import org.apache.airavata.helix.impl.task.TaskOnFailException;
 import org.apache.airavata.helix.task.api.support.AdaptorSupport;
 import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
 import org.apache.airavata.model.task.DataStagingTaskModel;
-import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +56,7 @@ public abstract class DataStagingTask extends AiravataTask {
 
     @SuppressWarnings("WeakerAccess")
     protected StorageResourceDescription getStorageResource() throws TaskOnFailException
{
-        StorageResourceDescription storageResource = getTaskContext().getStorageResource();
+        StorageResourceDescription storageResource = getTaskContext().getStorageResourceDescription();
         if (storageResource == null) {
             throw new TaskOnFailException("Storage resource can not be null for task " +
getTaskId(), true, null);
         }


Mime
View raw message