airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From patanac...@apache.org
Subject svn commit: r1167278 - in /incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac: context/invocation/InvocationContext.java context/invocation/impl/DefaultInvocationContext.java provider/utils/JobSubmissionListener.java
Date Fri, 09 Sep 2011 16:22:58 GMT
Author: patanachai
Date: Fri Sep  9 16:22:58 2011
New Revision: 1167278

URL: http://svn.apache.org/viewvc?rev=1167278&view=rev
Log:
Redesign concurrency in Gram job listener

Modified:
    incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/context/invocation/InvocationContext.java
    incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/context/invocation/impl/DefaultInvocationContext.java
    incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java

Modified: incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/context/invocation/InvocationContext.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/context/invocation/InvocationContext.java?rev=1167278&r1=1167277&r2=1167278&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/context/invocation/InvocationContext.java
(original)
+++ incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/context/invocation/InvocationContext.java
Fri Sep  9 16:22:58 2011
@@ -50,7 +50,7 @@ public interface InvocationContext {  
      * 
      * @param value
      */
-    public <T extends MessageContext<?>> void setInput(T value);
+    public void setInput(MessageContext<?> value);
 
     /**
      * Get output. Use to handle specific MessageContext for output
@@ -64,7 +64,7 @@ public interface InvocationContext {  
      * 
      * @param value
      */    
-    public <T extends MessageContext<?>> void setOutput(T value);
+    public void setOutput(MessageContext<?> value);
     
     /**
      * Get ExecutionDescription
@@ -79,7 +79,7 @@ public interface InvocationContext {  
      * 
      * @param value
      */
-    public <T extends ExecutionDescription> void setExecutionDescription(T value);
+    public void setExecutionDescription(ExecutionDescription value);
 
     /**
      * Get ExecutionContext
@@ -93,7 +93,7 @@ public interface InvocationContext {  
      * 
      * @param value
      */
-    public <T extends ExecutionContext> void setExecutionContext(T value);
+    public void setExecutionContext(ExecutionContext value);
 
     /**
      * Get MessageContext
@@ -109,7 +109,7 @@ public interface InvocationContext {  
      * @param name
      * @param value
      */
-    public <T extends MessageContext<?>> void addMessageContext(String name,
T value);
+    public void addMessageContext(String name, MessageContext<?> value);
 
     /**
      * Get SecurityContext
@@ -125,6 +125,6 @@ public interface InvocationContext {  
      * @param name
      * @param value
      */
-    public <T extends SecurityContext> void addSecurityContext(String name, T value);
+    public void addSecurityContext(String name, SecurityContext value);
 
 }

Modified: incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/context/invocation/impl/DefaultInvocationContext.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/context/invocation/impl/DefaultInvocationContext.java?rev=1167278&r1=1167277&r2=1167278&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/context/invocation/impl/DefaultInvocationContext.java
(original)
+++ incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/context/invocation/impl/DefaultInvocationContext.java
Fri Sep  9 16:22:58 2011
@@ -59,7 +59,7 @@ public class DefaultInvocationContext im
         return this.gfacContext;
     }
     
-    public <T extends ExecutionDescription> void setExecutionDescription(T value) {
+    public void setExecutionDescription(ExecutionDescription value) {
         this.gfacContext = value;
     }
     
@@ -67,7 +67,7 @@ public class DefaultInvocationContext im
         return this.executionContext;
     }
     
-    public <T extends ExecutionContext> void setExecutionContext(T value) {
+    public void setExecutionContext(ExecutionContext value) {
         this.executionContext = value;        
     }
     
@@ -79,11 +79,11 @@ public class DefaultInvocationContext im
         return this.securityContextMap.get(name);
     }
     
-    public <T extends MessageContext<?>> void addMessageContext(String name,
T value) {
+    public void addMessageContext(String name, MessageContext<?> value) {
         this.messageContextMap.put(name, value);
     }
 
-    public <T extends SecurityContext> void addSecurityContext(String name, T value)
{
+    public void addSecurityContext(String name, SecurityContext value) {
         this.securityContextMap.put(name, value);        
     }
 
@@ -91,7 +91,7 @@ public class DefaultInvocationContext im
         return getMessageContext(MESSAGE_CONTEXT_INPUT);
     }
     
-    public <T extends MessageContext<?>> void setInput(T value) {        
+    public void setInput(MessageContext<?> value) {        
         this.messageContextMap.put(MESSAGE_CONTEXT_INPUT, value);
     }
     
@@ -99,7 +99,7 @@ public class DefaultInvocationContext im
         return getMessageContext(MESSAGE_CONTEXT_OUTPUT);
     }
 
-    public <T extends MessageContext<?>> void setOutput(T value) {
+    public void setOutput(MessageContext<?> value) {
         this.messageContextMap.put(MESSAGE_CONTEXT_OUTPUT, value);
         
     };    

Modified: incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java?rev=1167278&r1=1167277&r2=1167278&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java
(original)
+++ incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java
Fri Sep  9 16:22:58 2011
@@ -34,8 +34,8 @@ import org.slf4j.LoggerFactory;
 
 public class JobSubmissionListener implements GramJobListener {
 
-	public static final String MYPROXY_SECURITY_CONTEXT = "myproxy";
-	
+    public static final String MYPROXY_SECURITY_CONTEXT = "myproxy";
+
     private boolean finished;
     private int error;
     private int status;
@@ -48,72 +48,89 @@ public class JobSubmissionListener imple
         this.context = context;
     }
 
-    // waits for DONE or FAILED status
+    /**
+     * This method is used to block the process until the status of the job is
+     * DONE or FAILED
+     * 
+     * @throws InterruptedException
+     * @throws GSSException
+     * @throws GramException
+     * @throws SecurityException
+     */
     public void waitFor() throws InterruptedException, GSSException, GramException, SecurityException
{
-        while (!finished) {
+        while (!isFinished()) {
             int proxyExpTime = job.getCredentials().getRemainingLifetime();
             if (proxyExpTime < 900) {
                 log.info("Job proxy expired. Trying to renew proxy");
-                GSSCredential gssCred = ((GSISecurityContext) context
-                        .getSecurityContext(MYPROXY_SECURITY_CONTEXT)).getGssCredentails();
+                GSSCredential gssCred = ((GSISecurityContext) context.getSecurityContext(MYPROXY_SECURITY_CONTEXT))
+                        .getGssCredentails();
                 job.renew(gssCred);
+                log.info("Myproxy renewed");
             }
-            // job status is changed but method isn't invoked
-            if (status != 0) {
-                if (job.getStatus() != status) {
-                    log.info("invoke method manually");
-                    statusChanged(job);
+
+            synchronized (this) {
+
+                /*
+                 * job status is changed but method isn't invoked
+                 */
+                if (status != 0) {
+                    if (job.getStatus() != status) {
+                        log.info("Change job status manually");
+                        if (setStatus(job.getStatus(), job.getError()))
+                            break;
+                    } else {
+                        log.info("job " + job.getIDAsString() + " have same status: "
+                                + GramJob.getStatusAsString(status));
+                    }
                 } else {
-                    log.info("job " + job.getIDAsString() + " have same status: " + GramJob.getStatusAsString(status));
+                    log.info("Status is zero");
                 }
-            } else {
-                log.info("Status is zero");
-            }
 
-            synchronized (this) {
                 wait(60 * 1000l);
             }
         }
     }
-    
-    public synchronized void statusChanged(GramJob job) {
-        int jobStatus = job.getStatus();
-        String jobId = job.getIDAsString();
-        String statusString = job.getStatusAsString();
-        String jobStatusMessage = formatJobStatus(jobId, statusString);
-        log.info(jobStatusMessage);
-        status = jobStatus;
-        context.getExecutionContext().getNotificationService().statusChanged(this, this.context,
jobStatusMessage);
-        if (jobStatus == GramJob.STATUS_DONE) {
-            finished = true;
-        } else if (jobStatus == GramJob.STATUS_FAILED) {
-            finished = true;
-            error = job.getError();
+
+    private synchronized boolean isFinished() {
+        return this.finished;
+    }
+
+    private synchronized boolean setStatus(int status, int error) {
+        this.status = status;
+        this.error = error;
+
+        switch (this.status) {
+        case GramJob.STATUS_FAILED:
             log.info("Job Error Code: " + error);
+        case GramJob.STATUS_DONE:
+            this.finished = true;
         }
 
-        if (finished) {
-            notify();
+        return this.finished;
+    }
+
+    public void statusChanged(GramJob job) {
+        String jobStatusMessage = "Status of job " + job.getIDAsString() + "is " + job.getStatusAsString();
+        log.info(jobStatusMessage);
+
+        /*
+         * Notify status change
+         */
+        this.context.getExecutionContext().getNotificationService().statusChanged(this, this.context,
jobStatusMessage);
+
+        /*
+         * Set new status if it is finished, notify all wait object
+         */
+        if (setStatus(job.getStatus(), job.getError())) {
+            notifyAll();
         }
     }
 
-    public int getError() {
+    public synchronized int getError() {
         return error;
     }
 
-    public int getStatus() {
+    public synchronized int getStatus() {
         return status;
     }
-    
-    private String formatJobStatus(String jobid, String jobstatus) {
-        return "Status of job " + jobid + "is " + jobstatus;
-    }
-
-    public void wakeup() {
-        try {
-            notify();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
 }



Mime
View raw message