airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sma...@apache.org
Subject [1/2] git commit: added job-throttler prototype to sandbox
Date Tue, 19 Aug 2014 02:59:18 GMT
Repository: airavata-sandbox
Updated Branches:
  refs/heads/master bcab933e1 -> 8f9229d5c


added job-throttler prototype to sandbox

GSOC 2014 project, prototype meta-scheduler to check submitted jobs
against running and queued jobs and resource queue limits.


Project: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/commit/463552e7
Tree: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/tree/463552e7
Diff: http://git-wip-us.apache.org/repos/asf/airavata-sandbox/diff/463552e7

Branch: refs/heads/master
Commit: 463552e77f83099547b232ac7d3aaacd290393dc
Parents: bcab933
Author: Scott McCaulay <scottmccaulay@gmail.com>
Authored: Mon Aug 18 16:28:50 2014 -0400
Committer: Suresh Marru <smarru@apache.org>
Committed: Mon Aug 18 22:57:05 2014 -0400

----------------------------------------------------------------------
 job-throttler/.classpath                        |   7 +
 job-throttler/.project                          |  17 ++
 .../.settings/org.eclipse.jdt.core.prefs        |  11 +
 job-throttler/bin/MetaScheduleTest.class        | Bin 0 -> 3025 bytes
 job-throttler/bin/MetaScheduler.class           | Bin 0 -> 6607 bytes
 job-throttler/src/MetaScheduleTest.java         | 105 +++++++++
 job-throttler/src/MetaScheduler.java            | 229 +++++++++++++++++++
 7 files changed, 369 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/463552e7/job-throttler/.classpath
----------------------------------------------------------------------
diff --git a/job-throttler/.classpath b/job-throttler/.classpath
new file mode 100644
index 0000000..4005922
--- /dev/null
+++ b/job-throttler/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" path="src"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
+	<classpathentry kind="lib" path="C:/Users/smccaula/.m2/repository/mysql/mysql-connector-java/5.1.31/mysql-connector-java-5.1.31.jar"/>
+	<classpathentry kind="output" path="bin"/>
+</classpath>

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/463552e7/job-throttler/.project
----------------------------------------------------------------------
diff --git a/job-throttler/.project b/job-throttler/.project
new file mode 100644
index 0000000..f1b4bd1
--- /dev/null
+++ b/job-throttler/.project
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>job-throttler</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+	</natures>
+</projectDescription>

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/463552e7/job-throttler/.settings/org.eclipse.jdt.core.prefs
----------------------------------------------------------------------
diff --git a/job-throttler/.settings/org.eclipse.jdt.core.prefs b/job-throttler/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..7341ab1
--- /dev/null
+++ b/job-throttler/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,11 @@
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
+org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
+org.eclipse.jdt.core.compiler.compliance=1.7
+org.eclipse.jdt.core.compiler.debug.lineNumber=generate
+org.eclipse.jdt.core.compiler.debug.localVariable=generate
+org.eclipse.jdt.core.compiler.debug.sourceFile=generate
+org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
+org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
+org.eclipse.jdt.core.compiler.source=1.7

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/463552e7/job-throttler/bin/MetaScheduleTest.class
----------------------------------------------------------------------
diff --git a/job-throttler/bin/MetaScheduleTest.class b/job-throttler/bin/MetaScheduleTest.class
new file mode 100644
index 0000000..42be02b
Binary files /dev/null and b/job-throttler/bin/MetaScheduleTest.class differ

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/463552e7/job-throttler/bin/MetaScheduler.class
----------------------------------------------------------------------
diff --git a/job-throttler/bin/MetaScheduler.class b/job-throttler/bin/MetaScheduler.class
new file mode 100644
index 0000000..64cf687
Binary files /dev/null and b/job-throttler/bin/MetaScheduler.class differ

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/463552e7/job-throttler/src/MetaScheduleTest.java
----------------------------------------------------------------------
diff --git a/job-throttler/src/MetaScheduleTest.java b/job-throttler/src/MetaScheduleTest.java
new file mode 100644
index 0000000..f59f920
--- /dev/null
+++ b/job-throttler/src/MetaScheduleTest.java
@@ -0,0 +1,105 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.*;
+
+public class MetaScheduleTest {
+
+    public static void main(String[] args) {
+        ArrayList<String> experimentData = new ArrayList<String>(); 
+        ArrayList<String> statusData = new ArrayList<String>(); 
+        Random rand = new Random();
+        String hostID = "";
+        String queueID = "";
+        String gatewayID = "";
+        int submitCtr = 0;
+        int holdCtr = 0;
+        int errorCtr = 0;
+
+    	String sarg0 = args[0];
+    	if (sarg0.equals("clear")) {
+            System.out.println("Clearing Active Jobs");
+            MetaScheduler.clearActiveJobs();	
+    	}
+    
+    	if ((sarg0.equals("run")) || (sarg0.equals("end"))) {
+        	int runCount = Integer.parseInt(args[1]);     
+            int  n = rand.nextInt(100);
+        	hostID = "quarry.uits.iu.edu";
+        	if (n < 50) {
+            	hostID = "bigred2.uits.iu.edu";            	
+            }
+            queueID = "debug";
+            n = rand.nextInt(100);
+            if (n < 25) {
+        		queueID = "serial";            	
+            }            
+            n = rand.nextInt(100);
+            if (n < 25) {
+        		queueID = "normal";            	
+            }            
+            n = rand.nextInt(100);
+            if (n < 25) {
+        		queueID = "long";            	
+            }   
+            n = rand.nextInt(100);
+            gatewayID = "Gateway1";
+            n = rand.nextInt(100);
+            if (n < 50) {
+        		gatewayID = "Gateway2";            	
+            }          
+            long currentTime = ((long) System.currentTimeMillis()) / 99;
+            for (int i=0;i<runCount;i++) {
+                experimentData.add(hostID); 
+                experimentData.add(queueID);
+                experimentData.add(gatewayID);
+                experimentData.add(Long.toString(currentTime+i));
+        	}
+        	if (sarg0.equals("run")) {
+                System.out.println("Scheduling " + runCount + " jobs on "
+                		+ hostID + " : " + queueID + " for " + gatewayID);
+                statusData = MetaScheduler.submitThrottleJob(experimentData);  
+                // display results summarized
+                int dataNDX = 0;
+                while (dataNDX < statusData.size()) {
+                	if (statusData.get(dataNDX).equals("SUBMIT")) {
+                		submitCtr++;
+                	}
+                	if (statusData.get(dataNDX).equals("HOLD")) {
+                		holdCtr++;
+                	}
+                	if (statusData.get(dataNDX).equals("ERROR")) {
+                		errorCtr++;
+                	}	
+                	dataNDX++;
+                }
+                System.out.println("Job States: submit=" + submitCtr + "  hold="
+                		+ holdCtr + "  error=" + errorCtr);
+        	}
+        	if (sarg0.equals("end")) {
+                System.out.println("Changing Status of " + runCount + " jobs on "
+                		+ hostID + " : " + queueID + " for " + gatewayID);
+                MetaScheduler.changeJobStatus(experimentData);        	
+        	}
+    	}
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata-sandbox/blob/463552e7/job-throttler/src/MetaScheduler.java
----------------------------------------------------------------------
diff --git a/job-throttler/src/MetaScheduler.java b/job-throttler/src/MetaScheduler.java
new file mode 100644
index 0000000..02f7047
--- /dev/null
+++ b/job-throttler/src/MetaScheduler.java
@@ -0,0 +1,229 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.sql.*;
+import java.util.ArrayList;
+
+public class MetaScheduler {
+    
+    public static Connection mySQLConnection()  {
+        String jdbcDriver = "com.mysql.jdbc.Driver";
+        String jdbcUser="******";
+        String jdbcPwd="********";
+        String jdbcUrl="jdbc:mysql://rdc04.uits.iu.edu:3059/scheduler";
+        Connection connect = null;
+        try {
+        Class.forName(jdbcDriver).newInstance();
+        connect = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPwd);         
+        } 
+        
+        catch (Exception e) {
+            System.out.println("Connection to mysql Failed!");
+            e.printStackTrace();
+            return(null);      
+        }
+        return(connect);
+      }
+    
+    public static ArrayList<String> submitThrottleJob(ArrayList<String> experimentData)
{
+    	ArrayList<String> statusData = new ArrayList<String>(); 
+        String hostID = "";
+        String queueID = "";
+        String experimentID = "";
+        String gatewayID = "";
+        String jobStatus = "";
+        int activeJobs  = 0;
+        int queueLimit = 0;
+        
+        Connection conn = mySQLConnection();
+        
+        int dataNDX = 0;
+        while (dataNDX < experimentData.size()) {
+            hostID = experimentData.get(dataNDX);
+            queueID = experimentData.get(dataNDX+1);
+            gatewayID = experimentData.get(dataNDX+2);
+            experimentID = experimentData.get(dataNDX+3);
+            queueLimit = getQueueLimits(hostID,queueID,conn);
+            activeJobs = getActiveJobs(gatewayID, hostID, queueID, conn);
+            jobStatus = getJobStatus(activeJobs, queueLimit);
+            if (!updateActiveJobs(gatewayID, hostID, queueID, experimentID, jobStatus, conn))
+            	jobStatus = "ERROR";
+            statusData.add(jobStatus);
+            dataNDX += 4;
+       }
+        return(statusData);
+    }
+    
+    public static Boolean updateActiveJobs(String gatewayID, String hostID, 
+    		String queueID, String jobID, String jobStatus, Connection conn ) {	
+        try {
+            try {
+                String sql = "insert into activejobs "
+                		+ " (gatewayID, hostID, queueName, jobID, jobState) VALUES "
+                		+ " (?,?,?,?,?) ";
+                PreparedStatement insertSQL = conn.prepareStatement(sql);
+                insertSQL.setString(1, gatewayID);
+                insertSQL.setString(2, hostID);
+                insertSQL.setString(3, queueID);
+                insertSQL.setString(4, jobID);
+                insertSQL.setString(5, jobStatus);               
+                insertSQL.executeUpdate();
+
+            } finally {
+
+            }
+        } catch (SQLException e) {
+            return false;
+        }	
+    	return(true);
+    }
+    
+    public static String getJobStatus(int activeJobs, int queueLimit) {
+    	String jobStatus = "";
+    	if (queueLimit > activeJobs)
+    		jobStatus = "SUBMIT";
+    	else
+    		jobStatus = "HOLD";
+    	return(jobStatus);
+    }
+    
+    public static int getActiveJobs(String gatewayID, String hostID, String queueID, Connection
conn) {
+    	int activeJobs = 0;
+        try {
+            Statement statement = null;
+            try {
+                statement = conn.createStatement();
+                String sql = "select count(*) from activejobs where " +
+                " gatewayID = ? and hostID = ? and queueName = ?";
+                PreparedStatement updateSQL = conn.prepareStatement(sql);
+                updateSQL.setString(1, gatewayID);
+                updateSQL.setString(2, hostID);
+                updateSQL.setString(3, queueID);
+                ResultSet rs = updateSQL.executeQuery();
+                if (rs != null) {
+                	 rs.next();          
+                	 activeJobs = rs.getInt(1);                
+                	 rs.close();
+                }
+            } finally {
+                try {
+                    if (statement != null) {
+                        statement.close();
+                    }
+                } catch (SQLException e) {
+                    return -1;
+                }
+            }
+        } catch (SQLException e) {
+            return -1;
+        }
+        return activeJobs;
+    }    
+    
+    public static int getQueueLimits(String hostID, String queueID, Connection conn) {
+    	int queueLimit = 0;
+        try {
+            Statement statement = null;
+            try {
+                statement = conn.createStatement();
+                String sql = "select * from queuelimits where hostID = ? and queueName =
?";
+                PreparedStatement updateSQL = conn.prepareStatement(sql);
+                updateSQL.setString(1, hostID);
+                updateSQL.setString(2, queueID);
+                ResultSet rs = updateSQL.executeQuery();
+                if (rs != null) {
+                    while (rs.next()) {
+                          queueLimit = (Integer) rs.getObject("queueLimit");
+                        }                    
+                    rs.close();
+                }
+            } finally {
+                try {
+                    if (statement != null) {
+                        statement.close();
+                    }
+                } catch (SQLException e) {
+                    return -1;
+                }
+            }
+        } catch (SQLException e) {
+            return -1;
+        }
+        return queueLimit;
+    }
+    
+    public static Boolean clearActiveJobs() {	
+        try {
+        	Connection conn = mySQLConnection();
+            Statement statement = null;
+            try {
+                statement = conn.createStatement();
+                String sql = "delete from activejobs";
+                statement.executeUpdate(sql);
+            } finally {
+                try {
+                    if (statement != null) {
+                        statement.close();
+                    }
+                } catch (SQLException e) {
+                    return false;
+                }
+            }
+        } catch (SQLException e) {
+            return false;
+        }	
+    	return(true);
+    }
+    
+    public static Boolean changeJobStatus(ArrayList<String> experimentData) {	
+        String hostID = "";
+        String queueID = "";
+        String gatewayID = "";
+        int deleteCtr = (experimentData.size() + 1) / 4;
+        
+        int dataNDX = 0;
+        if (deleteCtr > 0) {
+            hostID = experimentData.get(dataNDX);
+            queueID = experimentData.get(dataNDX+1);
+            gatewayID = experimentData.get(dataNDX+2);        	
+        }
+        try {
+        	Connection conn = mySQLConnection();
+            try {
+                // not really meaningful, but useful for testing
+                String sql = "delete top (?) from activejobs where " +
+                		"hostID = ? and queueID = ? and gatewayID = ? ";
+                PreparedStatement updateSQL = conn.prepareStatement(sql);
+                updateSQL.setInt(1, deleteCtr);
+                updateSQL.setString(2, hostID);
+                updateSQL.setString(3, queueID);
+                updateSQL.setString(4, gatewayID);
+                updateSQL.executeQuery();
+            } finally {
+
+            }
+        } catch (SQLException e) {
+            return false;
+        }	
+    	return(true);
+    }
+}


Mime
View raw message