airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramin...@apache.org
Subject git commit: Added new method to get monitor command and create client without zookeeper.
Date Thu, 14 Aug 2014 02:33:51 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 3721954dd -> f37dad87a


Added new method to get monitor command and create client without
zookeeper.

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f37dad87
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f37dad87
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f37dad87

Branch: refs/heads/master
Commit: f37dad87a0d86692abbab89cf3abd6010d7302e0
Parents: 3721954
Author: raminder <raminder@apache.org>
Authored: Wed Aug 13 22:33:13 2014 -0400
Committer: raminder <raminder@apache.org>
Committed: Wed Aug 13 22:33:13 2014 -0400

----------------------------------------------------------------------
 .../core/impl/GFACServiceJobSubmitter.java      | 155 ++++++++++---------
 .../gsi/ssh/impl/GSISSHAbstractCluster.java     |  13 +-
 .../airavata/gsi/ssh/impl/RawCommandInfo.java   |   6 +-
 3 files changed, 90 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/f37dad87/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index 1dd8dbb..a2c153e 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -17,7 +17,7 @@
  * specific language governing permissions and limitations
  * under the License.
  *
-*/
+ */
 package org.apache.airavata.orchestrator.core.impl;
 
 import java.io.File;
@@ -29,6 +29,8 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.cpi.GFac;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
@@ -50,86 +52,87 @@ import org.slf4j.LoggerFactory;
  * gfac instance.
  */
 public class GFACServiceJobSubmitter implements JobSubmitter, Watcher {
-    private final static Logger logger = LoggerFactory.getLogger(GFACServiceJobSubmitter.class);
-    public static final String IP = "ip";
-
-    private OrchestratorContext orchestratorContext;
-
-    private static Integer mutex = -1;
-
-    public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException
{
-        this.orchestratorContext = orchestratorContext;
-    }
-
-    public GFACInstance selectGFACInstance() throws OrchestratorException {
-        // currently we only support one instance but future we have to pick an instance
-        return null;
-    }
+	private final static Logger logger = LoggerFactory.getLogger(GFACServiceJobSubmitter.class);
+	public static final String IP = "ip";
 
+	private OrchestratorContext orchestratorContext;
 
-    public boolean submit(String experimentID, String taskID) throws OrchestratorException
{
-        return this.submit(experimentID, taskID, null);
-    }
+	private static Integer mutex = -1;
 
+	public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException
{
+		this.orchestratorContext = orchestratorContext;
+	}
 
-    public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException
{
-        ZooKeeper zk = orchestratorContext.getZk();
-        try {
-            if (zk==null || !zk.getState().isConnected()) {
-                String zkhostPort = AiravataZKUtils.getZKhostPort();
-                zk = new ZooKeeper(zkhostPort, 6000, this);
-                synchronized (mutex) {
-                    mutex.wait();
-                }
-            }
-            String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,
"/gfac-server");
-            String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
-            List<String> children = zk.getChildren(gfacServer, this);
-//            System.out.println(children);
-            String pickedChild;
-            if(children.size() == 0){
-            	pickedChild = "gfac-node0";
-            }else{
-             pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size());
-            }
-            // here we are not using an index because the getChildren does not return the
same order everytime
+	public GFACInstance selectGFACInstance() throws OrchestratorException {
+		// currently we only support one instance but future we have to pick an
+		// instance
+		return null;
+	}
 
-            String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild,
false, null));
-            logger.info("GFAC instance node data: " + gfacNodeData);
-            String[] split = gfacNodeData.split(":");
-            GfacService.Client localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
-            if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {  
   // before submitting the job we check again the state of the node
-                if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode,
pickedChild,tokenId)) {
-                    //FIXME:: The GatewayID is temporarily read from properties file. It
should instead be inferred from the token.
-                    return localhost.submitJob(experimentID, taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME));
-                }
-            }
-        } catch (TException e) {
-            throw new OrchestratorException(e);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        } catch (KeeperException e) {
-            e.printStackTrace();
-        } catch (ApplicationSettingsException e) {
-            e.printStackTrace();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return false;
-    }
+	public boolean submit(String experimentID, String taskID) throws OrchestratorException {
+		return this.submit(experimentID, taskID, null);
+	}
 
+	public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException
{
+		ZooKeeper zk = orchestratorContext.getZk();
+		try {
+			if (zk == null || !zk.getState().isConnected()) {
+				String zkhostPort = AiravataZKUtils.getZKhostPort();
+				zk = new ZooKeeper(zkhostPort, 6000, this);
+				synchronized (mutex) {
+					mutex.wait();
+				}
+			}
+			String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
+			String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
+			List<String> children = zk.getChildren(gfacServer, this);
+			
+			if (children.size() == 0) {
+				// Zookeeper data need cleaning
+				GfacService.Client localhost = GFacClientFactory.createGFacClient(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST),
Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)));
+				return localhost.submitJob(experimentID, taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME));
+			} else {
+				String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size());
+				// here we are not using an index because the getChildren does not return the same order
everytime
+				String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild,
false, null));
+				logger.info("GFAC instance node data: " + gfacNodeData);
+				String[] split = gfacNodeData.split(":");
+				GfacService.Client localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
+				if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
+					// before submitting the job we check again the state of the node
+					if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild,
tokenId)) {
+						// FIXME:: The GatewayID is temporarily read from properties file. It should instead
be inferred from the token.
+						return localhost.submitJob(experimentID, taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME));
+					}
+				}
+			}
+		} catch (TException e) {
+			throw new OrchestratorException(e);
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} catch (KeeperException e) {
+			e.printStackTrace();
+		} catch (ApplicationSettingsException e) {
+			e.printStackTrace();
+		} catch (IOException e) {
+			e.printStackTrace();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		return false;
+	}
 
-    synchronized public void process(WatchedEvent event) {
-        synchronized (mutex) {
-            switch (event.getState()) {
-                case SyncConnected:
-                    mutex.notify();
-            }
-            switch (event.getType()) {
-                case NodeCreated:
-                    mutex.notify();
-                    break;
-            }
-        }
-    }
+	synchronized public void process(WatchedEvent event) {
+		synchronized (mutex) {
+			switch (event.getState()) {
+			case SyncConnected:
+				mutex.notify();
+			}
+			switch (event.getType()) {
+			case NodeCreated:
+				mutex.notify();
+				break;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/f37dad87/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
index 937ed9c..cf65a7f 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
@@ -222,7 +222,7 @@ public class GSISSHAbstractCluster implements Cluster {
 
         StandardOutReader stdOutReader = new StandardOutReader();
         CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
-        String outputifAvailable = getOutputifAvailable(stdOutReader, "Error reading output
of job submission",rawCommandInfo.getCommand());
+        String outputifAvailable = getOutputifAvailable(stdOutReader, "Error reading output
of job submission",rawCommandInfo.getBaseCommand());
         // this might not be the case for all teh resources, if so Cluster implementation
can override this method
         // because here after cancelling we try to get the job description and return it
back
         JobDescriptor jobById = this.getJobDescriptorById(jobID);
@@ -250,7 +250,7 @@ public class GSISSHAbstractCluster implements Cluster {
         //Check whether pbs submission is successful or not, if it failed throw and exception
in submitJob method
         // with the error thrown in qsub command
         //
-        String outputifAvailable = getOutputifAvailable(standardOutReader,"Error reading
output of job submission",rawCommandInfo.getCommand());
+        String outputifAvailable = getOutputifAvailable(standardOutReader,"Error reading
output of job submission",rawCommandInfo.getBaseCommand());
         OutputParser outputParser = jobManagerConfiguration.getParser();
         return  outputParser.parse(outputifAvailable);
     }
@@ -315,7 +315,7 @@ public class GSISSHAbstractCluster implements Cluster {
         RawCommandInfo rawCommandInfo = jobManagerConfiguration.getMonitorCommand(jobID);
         StandardOutReader stdOutReader = new StandardOutReader();
         CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
-        String result = getOutputifAvailable(stdOutReader, "Error getting job information
from the resource !",rawCommandInfo.getCommand());
+        String result = getOutputifAvailable(stdOutReader, "Error getting job information
from the resource !",rawCommandInfo.getBaseCommand());
         JobDescriptor jobDescriptor = new JobDescriptor();
         jobManagerConfiguration.getParser().parse(jobDescriptor,result);
         return jobDescriptor;
@@ -325,7 +325,7 @@ public class GSISSHAbstractCluster implements Cluster {
         RawCommandInfo rawCommandInfo = jobManagerConfiguration.getMonitorCommand(jobID);
         StandardOutReader stdOutReader = new StandardOutReader();
         CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
-        String result = getOutputifAvailable(stdOutReader, "Error getting job information
from the resource !", rawCommandInfo.getCommand());
+        String result = getOutputifAvailable(stdOutReader, "Error getting job information
from the resource !", rawCommandInfo.getBaseCommand());
         return jobManagerConfiguration.getParser().parse(jobID, result);
     }
 
@@ -424,11 +424,10 @@ public class GSISSHAbstractCluster implements Cluster {
     }
 
     public void getJobStatuses(String userName, Map<String,JobStatus> jobIDs)throws
SSHApiException {
-//        RawCommandInfo rawCommandInfo = jobManagerConfiguration.getUserBasedMonitorCommand(userName);
-        RawCommandInfo rawCommandInfo = new RawCommandInfo("qstat -u abc");
+        RawCommandInfo rawCommandInfo = jobManagerConfiguration.getUserBasedMonitorCommand(userName);
         StandardOutReader stdOutReader = new StandardOutReader();
         CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
-        String result = getOutputifAvailable(stdOutReader, "Error getting job information
from the resource !", rawCommandInfo.getCommand());
+        String result = getOutputifAvailable(stdOutReader, "Error getting job information
from the resource !", rawCommandInfo.getBaseCommand());
         jobManagerConfiguration.getParser().parse(userName,jobIDs, result);
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/f37dad87/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java
index 0e9d16e..ab85925 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java
@@ -48,7 +48,11 @@ public class RawCommandInfo implements CommandInfo {
     public String getRawCommand() {
         return rawCommand;
     }
-
+    
+    public String getBaseCommand() {
+        return rawCommand.substring(0, rawCommand.indexOf(" "));
+    }
+    
     public void setRawCommand(String rawCommand) {
         this.rawCommand = rawCommand;
     }


Mime
View raw message