oodt-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mattm...@apache.org
Subject oodt git commit: Fix for OODT-305: Make the Resource Manager manage different queues of jobs independently (resneck, mattmann)
Date Mon, 17 Jul 2017 04:37:32 GMT
Repository: oodt
Updated Branches:
  refs/heads/master d329e1ab9 -> 4f3588ce8


Fix for OODT-305: Make the Resource Manager manage different queues of jobs independently
(resneck, mattmann)


Project: http://git-wip-us.apache.org/repos/asf/oodt/repo
Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/4f3588ce
Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/4f3588ce
Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/4f3588ce

Branch: refs/heads/master
Commit: 4f3588ce8c9a455fb4efbd5db04009c8c91dd44b
Parents: d329e1a
Author: Chris Mattmann <mattmann@apache.org>
Authored: Sun Jul 16 21:37:26 2017 -0700
Committer: Chris Mattmann <mattmann@apache.org>
Committed: Sun Jul 16 21:37:26 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../resource/jobqueue/FifoMappedJobQueue.java   | 510 +++++++++++++++++++
 .../jobqueue/FifoMappedJobQueueFactory.java     |  75 +++
 .../cas/resource/jobqueue/MappedJobQueue.java   | 132 +++++
 .../apache/oodt/cas/resource/structs/Job.java   |  12 +
 .../oodt/cas/resource/structs/JobInput.java     |   8 +
 .../cas/resource/structs/NameValueJobInput.java |  17 +
 .../jobqueue/TestFifoMappedJobQueue.java        | 101 ++++
 8 files changed, 857 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c4be36b..7e4fc34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,8 @@ Apache OODT Change Log
 
 Release 1.1 - Current Development
 
+* OODT-305 Make the Resource Manager manage different queues of jobs independently (resneck,
mattmann)
+
 * OODT-753 Move FM and WM Python APIs into "agility" component (kelly, mattmann)
 
 * OODT-563 Task editing and Workflow execution (varun, mattmann)

http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueue.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueue.java
b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueue.java
new file mode 100644
index 0000000..ac32fa6
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueue.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.jobqueue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.oodt.cas.resource.jobrepo.JobRepository;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.JobStatus;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+
+/**
+ * This implementation provides a "queue-aware" {@link JobQueue} that ensures
+ * the FIFO execution of jobs.
+ * 
+ * @author resneck
+ *
+ */
+public class FifoMappedJobQueue implements MappedJobQueue {
+
+  private Map<String, Vector<String>> queues;
+  private int maxQueueSize;
+  private JobRepository repo;
+  private static final Logger LOG = Logger
+      .getLogger(FifoMappedJobQueue.class.getName());
+
+  public FifoMappedJobQueue(int maxSize, JobRepository repo) {
+    this.maxQueueSize = maxSize;
+    this.repo = repo;
+    this.queues = new HashMap<String, Vector<String>>();
+  }
+
+  /**
+   * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.addJob(JobSpec)
+   */
+  public synchronized String addJob(JobSpec spec) throws JobQueueException {
+
+    // Check if the job is null and if its queue exists
+    if (spec == null) {
+      throw new JobQueueException("A null job was given.");
+    }
+    String queueName = spec.getJob().getQueueName();
+    validateQueueName(queueName);
+
+    // Check if the jobs queue is full
+    List<String> queue = queues.get(queueName);
+    if (queue.size() == maxQueueSize) {
+      throw new JobQueueException(
+          "The queue " + spec.getJob().getQueueName() + " is full.  The job "
+              + spec.getJob().getId() + " could not be requeued.");
+    }
+
+    // Add the job to the repository
+    try {
+      this.repo.addJob(spec);
+    } catch (JobRepositoryException e) {
+      throw new JobQueueException(
+          "An error occurred while adding job " + spec.getJob().getId()
+              + " to the job repository: " + e.getMessage());
+    }
+
+    // Add the job to the queue
+    queue.add(spec.getJob().getId());
+
+    // Update the jobs status
+    spec.getJob().setStatus(JobStatus.QUEUED);
+    try {
+      this.repo.updateJob(spec);
+    } catch (JobRepositoryException e) {
+      throw new JobQueueException("An error occurred while updating "
+          + "the status of job " + spec.getJob().getId()
+          + " in the job repository: " + e.getMessage());
+    }
+
+    LOG.log(Level.INFO,
+        "Job [id=" + spec.getJob().getId() + ",name=" + spec.getJob().getName()
+            + "] was added to the job queue in queue " + queueName);
+    return spec.getJob().getId();
+
+  }
+
+  /**
+   * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.requeueJob(JobSpec)
+   */
+  public synchronized String requeueJob(JobSpec spec) throws JobQueueException {
+
+    // Check if the job is null and if its queue exists
+    if (spec == null) {
+      throw new JobQueueException("A null job was given.");
+    }
+    String queueName = spec.getJob().getQueueName();
+    validateQueueName(queueName);
+
+    List<String> queue = queues.get(queueName);
+
+    // Place the job at the front of the queue
+    queue.add(0, spec.getJob().getId());
+
+    // Set the jobs status
+    spec.getJob().setStatus(JobStatus.QUEUED);
+    try {
+      this.repo.updateJob(spec);
+    } catch (JobRepositoryException e) {
+      throw new JobQueueException("An error occurred while updating "
+          + "the status of job " + spec.getJob().getId()
+          + " in the job repository: " + e.getMessage());
+    }
+
+    return spec.getJob().getId();
+
+  }
+
+  /**
+   * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.getQueuedJobs()
+   */
+  public synchronized List getQueuedJobs() {
+
+    List<JobSpec> allJobs = new Vector<JobSpec>();
+    for (Iterator<Vector<String>> i = queues.values().iterator(); i
+        .hasNext();) {
+      List<String> queue = i.next();
+      for (String jobId : queue) {
+        try {
+          allJobs.add(this.repo.getJobById(jobId));
+        } catch (JobRepositoryException e) {
+          LOG.log(Level.WARNING, "Failed to fetch JobSpec from repo: " + jobId);
+        }
+      }
+    }
+
+    return allJobs;
+
+  }
+
+  public synchronized List<JobSpec> getQueuedJobs(String queueName)
+      throws JobQueueException, JobRepositoryException {
+
+    // Check if the queue name is null or if it does not exist
+    validateQueueName(queueName);
+
+    List<JobSpec> queueJobs = new Vector<JobSpec>();
+    for (String jobId : this.queues.get(queueName)) {
+      queueJobs.add(this.repo.getJobById(jobId));
+    }
+
+    return queueJobs;
+
+  }
+
+  /**
+   * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.purge()
+   */
+  public synchronized void purge() {
+
+    for (Iterator<Vector<String>> i = this.queues.values().iterator(); i
+        .hasNext();) {
+      i.next().removeAllElements();
+    }
+
+  }
+
+  /**
+   * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.isEmpty()
+   */
+  public synchronized boolean isEmpty() {
+
+    return this.getSize() == 0;
+
+  }
+
+  public synchronized boolean isEmpty(String queueName)
+      throws JobQueueException {
+
+    // Check if the queue name is null or if it does not exist
+    validateQueueName(queueName);
+
+    return this.queues.get(queueName).size() == 0;
+
+  }
+
+  /**
+   * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.getNextJob()
+   */
+  public synchronized JobSpec getNextJob() {
+
+    // Check if any queues exist
+    if (this.queues.keySet().size() == 0) {
+      throw new RuntimeException("No queues are defined.");
+    }
+
+    // Check if all queues are empty
+    if (this.isEmpty()) {
+      throw new RuntimeException("The queue contains no jobs.");
+    }
+
+    // Look in each queue for a job
+    for (Iterator<String> i = this.queues.keySet().iterator(); i.hasNext();) {
+
+      // Check if the queue is empty
+      List<String> queue = this.queues.get(i.next());
+      if (!queue.isEmpty()) {
+
+        // Check jobs from the front of the queue until we find one that
+        // is flagged as ready to schedule
+        for (int index = 0; index < queue.size(); index++) {
+
+          // Check how the job is flagged
+          String jobId = queue.get(index);
+          JobSpec spec = null;
+          try {
+            spec = this.repo.getJobById(jobId);
+          } catch (JobRepositoryException e) {
+            LOG.log(Level.WARNING,
+                "Failed to fetch JobSpec from repo: " + jobId);
+          }
+          if (spec.getJob().getReady()) {
+
+            // Remove the job from the queue
+            queue.remove(index);
+
+            // Set the status of the fetched job
+            spec.getJob().setStatus(JobStatus.SCHEDULED);
+            try {
+              this.repo.updateJob(spec);
+            } catch (JobRepositoryException e) {
+              LOG.log(Level.WARNING,
+                  "The status of job " + spec.getJob().getId()
+                      + "was not properly set "
+                      + "after being dequeued. Message: " + e.getMessage());
+            }
+
+            return spec;
+
+          }
+
+        }
+
+      }
+
+    }
+
+    return null;
+
+  }
+
+
+  public synchronized JobSpec getNextJob(String queueName)
+      throws JobQueueException, JobRepositoryException {
+
+    // Check if the given queue name is null and if it exists
+    validateQueueName(queueName);
+
+    // If the queue contains no jobs, return null
+    List<String> queue = queues.get(queueName);
+    if (queue.isEmpty()) {
+      return null;
+    }
+
+    // Check jobs from the front of the queue until we find one that
+    // is flagged as ready to schedule
+    for (int index = 0; index < queue.size(); index++) {
+
+      // Check how the job is flagged
+      String jobId = queue.get(index);
+      JobSpec spec = this.repo.getJobById(jobId);
+      if (spec.getJob().getReady()) {
+
+        // Remove the job from the queue
+        queue.remove(index);
+
+        // Set the status of the fetched job
+        spec.getJob().setStatus(JobStatus.SCHEDULED);
+        try {
+          this.repo.updateJob(spec);
+        } catch (JobRepositoryException e) {
+          LOG.log(Level.WARNING,
+              "The status of job " + spec.getJob().getId()
+                  + "was not properly set after being" + " dequeued. Message: "
+                  + e.getMessage());
+        }
+
+        return spec;
+
+      }
+
+    }
+
+    return null;
+
+  }
+
+  /**
+   * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.getJobRepository()
+   */
+  public synchronized JobRepository getJobRepository() {
+
+    return this.repo;
+
+  }
+
+  /**
+   * @see org.apache.oodt.cas.resource.jobqueue.JobQueue.getSize()
+   */
+  public synchronized int getSize() {
+
+    int totalJobs = 0;
+    for (Iterator<Vector<String>> i = queues.values().iterator(); i
+        .hasNext();) {
+      totalJobs += i.next().size();
+    }
+    return totalJobs;
+
+  }
+
+  public synchronized int getSize(String queueName) throws JobQueueException {
+
+    // Check if the given queue name is null and if it exists
+    validateQueueName(queueName);
+
+    return this.queues.get(queueName).size();
+
+  }
+
+  /**
+   * This method returns the number of jobs in any given queue that can be
+   * retained in the queue. This number does not change with the number of
+   * queues in the ResourceManager.
+   * 
+   * @return The number of jobs of each queue that can be queued.
+   */
+  public int getCapacity() {
+
+    return this.maxQueueSize;
+
+  }
+
+  // TODO: Write a javadoc for this method when it can actually be used by
+  // the operator.
+  public synchronized void removeJob(JobSpec spec) throws JobQueueException {
+
+    // Check if the job is null and if its queue exists
+    if (spec == null) {
+      throw new JobQueueException("A null job was given.");
+    }
+    String queueName = spec.getJob().getQueueName();
+    validateQueueName(queueName);
+
+    // Get the ID of the job
+    String id = spec.getJob().getId();
+
+    // Find the job in the queue and remove it
+    List<String> queue = this.queues.get(queueName);
+    int index = getIndexInQueue(id, queue);
+    if (index == -1) {
+      LOG.log(Level.WARNING, "No job with ID " + id + "could be removed "
+          + "since it was not found in the queue.");
+    } else {
+      queue.remove(index);
+    }
+
+  }
+
+  public synchronized void addQueue(String queueName) throws JobQueueException {
+
+    // Check if queue name is null or already exists
+    if (queueName == null) {
+      throw new JobQueueException("A null queue name was given.");
+    }
+    if (queues.containsKey(queueName)) {
+      throw new JobQueueException("A queue with name " + queueName
+          + " could not be created as one " + "with that name already exists.");
+    }
+
+    // Add the new queue to our map
+    this.queues.put(queueName, new Vector());
+
+  }
+
+
+  public synchronized void removeQueue(String queueName)
+      throws JobQueueException {
+
+    // Check if the given queue name is null and if it exists
+    validateQueueName(queueName);
+
+    // Warn the user if they are losing jobs
+    int queueSize = this.queues.get(queueName).size();
+    if (queueSize > 0) {
+      LOG.log(Level.WARNING, "The queue being removed (" + queueName
+          + ") contains " + queueSize + " jobs.");
+    }
+
+    // Delete the queue
+    this.queues.remove(queueName);
+
+  }
+
+  public synchronized void promoteJob(JobSpec spec) throws JobQueueException {
+
+    // Check if the job is null and if its queue exists
+    if (spec == null) {
+      throw new JobQueueException("A null job was given.");
+    }
+    String queueName = spec.getJob().getQueueName();
+    validateQueueName(queueName);
+
+    // Get the ID of the job
+    String id = spec.getJob().getId();
+
+    // Find the job in the queue and move it to the front
+    List<String> queue = this.queues.get(queueName);
+    int index = getIndexInQueue(id, queue);
+    if (index == -1) {
+      LOG.log(Level.WARNING, "No job with ID " + id + "could be promoted "
+          + "since it was not found in the queue.");
+    } else {
+      queue.add(0, queue.remove(index));
+    }
+
+  }
+  
+  public synchronized List<String> getQueueNames(){
+    if (this.queues != null && this.queues.keySet() != null && 
+        this.queues.keySet().size() > 0){
+      return Arrays.asList(queues.keySet().toArray(new String[]{""}));
+    }
+    return Collections.EMPTY_LIST;
+  }
+
+  public synchronized void promoteKeyValPair(String key, String val)
+      throws JobQueueException, JobRepositoryException {
+
+    List<JobSpec> specsToPromote = new Vector<JobSpec>();
+
+    for (Iterator<Vector<String>> i = queues.values().iterator(); i
+        .hasNext();) {
+      List<String> queue = i.next();
+      for (String jobId : queue) {
+        JobSpec spec = null;
+        try {
+          spec = this.repo.getJobById(jobId);
+        } catch (JobRepositoryException e) {
+          LOG.log(Level.WARNING, "Failed to fetch JobSpec from repo: " + jobId);
+        }
+        if (spec.getIn().getMetadata().get(key).equals(val)) {
+          specsToPromote.add(spec);
+        }
+      }
+    }
+
+    for (JobSpec spec : specsToPromote) {
+      promoteJob(spec);
+    }
+
+  }
+
+  /**
+   * This method checks if a given queue name is valid
+   * 
+   * @param queueName
+   *          The name of the queue to validate
+   * @throws JobQueueException
+   *           If the name is null or no queue with the given name exists
+   */
+  private void validateQueueName(String queueName) throws JobQueueException {
+
+    if (queueName == null) {
+      throw new JobQueueException("A null queue name was given.");
+    }
+    if (!queues.containsKey(queueName)) {
+      throw new JobQueueException(
+          "An invalid queue name was given: " + queueName);
+    }
+
+  }
+
+  private int getIndexInQueue(String id, List<String> queue) {
+    for (int i = 0; i < queue.size(); i++) {
+      if (queue.get(i).equals(id)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueueFactory.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueueFactory.java
b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueueFactory.java
new file mode 100644
index 0000000..96a5e13
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/FifoMappedJobQueueFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.jobqueue;
+
+import java.io.File;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.oodt.cas.metadata.util.PathUtils;
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.jobqueue.JobQueueFactory;
+import org.apache.oodt.cas.resource.jobrepo.JobRepository;
+import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
+
+
+/**
+ * This factory class reads in properties set in the resource.properties file
+ * and read in via the command line and uses those properties to create a
+ * {@link FifoMappedJobQueue}.
+ * 
+ * @author resneck
+ *
+ */
+public class FifoMappedJobQueueFactory implements JobQueueFactory {
+
+	private int stackSize = -1;
+	private JobRepository repo;
+	
+	private static final Logger LOG =
+			Logger.getLogger(FifoMappedJobQueueFactory.class.getName());
+	
+	public FifoMappedJobQueueFactory() {
+		try{
+			String stackSizeStr = System.getProperty(
+					"gov.nasa.smap.spdm.resource.jobqueue.fifomappedjobqueue.maxstacksize");
+	
+			if (stackSizeStr != null) {
+				stackSize = Integer.parseInt(stackSizeStr);
+			}
+		    
+			String jobRepoFactoryClassStr = System.getProperty(
+					"resource.jobrepo.factory",
+					"gov.nasa.smap.spdm.resource.jobrepo.SmapMemoryJobRepositoryFactory");
+			this.repo = GenericResourceManagerObjectFactory.
+					getJobRepositoryFromServiceFactory(jobRepoFactoryClassStr);
+		}catch(Exception e){
+			LOG.log(Level.SEVERE, "An error occurred while creating a " +
+					"FifoMappedJobQueue: " + e.getMessage());
+		}
+
+	}
+	
+	/**
+	 * @see org.apache.oodt.cas.resource.jobqueue.JobQueueFactory#createQueue()
+	 */
+	public JobQueue createQueue() {
+		return new FifoMappedJobQueue(stackSize, repo);
+	}
+	
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/MappedJobQueue.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/MappedJobQueue.java
b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/MappedJobQueue.java
new file mode 100644
index 0000000..fc2c636
--- /dev/null
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/jobqueue/MappedJobQueue.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.jobqueue;
+
+import java.util.List;
+
+import org.apache.oodt.cas.resource.jobqueue.JobQueue;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
+import org.apache.oodt.cas.resource.structs.exceptions.JobRepositoryException;
+
+/**
+ * The interface for a {@link JobQueue} that is aware of the different queues in
+ * the ResourceManager and allows for the manipulation of jobs on a
+ * queue-by-queue basis.
+ * 
+ * @author resneck
+ *
+ */
+public interface MappedJobQueue extends JobQueue {
+
+  /**
+   * Returns a boolean value representing whether or not the job queue contains
+   * jobs in the given queue.
+   * 
+   * @param queueName
+   *          The name of the queue which you are checking.
+   * @return true, if the queue is empty, false otherwise.
+   * @throws JobQueueException
+   *           If the given queue name is null or if no queue with that name
+   *           exists.
+   */
+  public boolean isEmpty(String queueName) throws JobQueueException;
+
+  /**
+   * Gets the next {@link JobSpec} in the queue with the given name.
+   * 
+   * @param queueName
+   *          The name of the queue from which the next {@link JobSpec} will be
+   *          returned.
+   * @return The next {@link JobSpec} from the jobqueue belonging to the queue
+   *         with the given name.
+   * @throws JobQueueException
+   *           If the given queue name is null or if no queue with that name
+   *           exists.
+   */
+  public JobSpec getNextJob(String queueName)
+      throws JobQueueException, JobRepositoryException;
+
+  /**
+   * Gets the number of jobs in the queue with the given name.
+   * 
+   * @param queueName
+   *          The name of the queue whos size will be given.
+   * @return The number of {@link JobSpec}s in the queue with the given name.
+   * @throws JobQueueException
+   *           If the given queue name is null or if no queue with that name
+   *           exists.
+   */
+  public int getSize(String queueName) throws JobQueueException;
+
+  /**
+   * Removes the {@link JobSpec} with the given ID from the queue.
+   * 
+   * @param id
+   *          The ID of the {@link JobSpec} that will be removed.
+   * @throws JobQueueException
+   *           If no {@link JobSpec} has the given ID.
+   */
+  public void removeJob(JobSpec spec) throws JobQueueException;
+
+  /**
+   * Add a queue with the given name.
+   * 
+   * @param queueName
+   *          The name of the queue to be created.
+   * @throws JobQueueException
+   *           If the given queue name is null or if a queue with the given name
+   *           already exists.
+   */
+  public void addQueue(String queueName) throws JobQueueException;
+
+  /**
+   * Remove the queue with the given name.
+   * 
+   * @param queueName
+   *          The name of the queue to be removed.
+   * @throws JobQueueException
+   *           If the given queue name is null or if no queue with the given
+   *           name exists.
+   */
+  public void removeQueue(String queueName) throws JobQueueException;
+
+  /**
+   * Gets a list of all queued jobs that belong to the queue with the given
+   * name.
+   * 
+   * @param queueName
+   *          The name of the queue whose members will be given.
+   * @return A {@link List} of queued {@JobSpec}s from the given queue.
+   * @throws JobQueueException
+   *           If the given queue name is null or if no queue with the given
+   *           name exists.
+   */
+  public List<JobSpec> getQueuedJobs(String queueName)
+      throws JobQueueException, JobRepositoryException;
+
+  public String addJob(JobSpec spec) throws JobQueueException;
+
+  public String requeueJob(JobSpec spec) throws JobQueueException;
+
+  public void promoteJob(JobSpec spec) throws JobQueueException;
+
+  public void promoteKeyValPair(String key, String val)
+      throws JobQueueException, JobRepositoryException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/structs/Job.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/Job.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/Job.java
index 88d857a..a947c0c 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/structs/Job.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/Job.java
@@ -49,6 +49,9 @@ public class Job {
 
     /* the status of this job */
     private String status = null;
+    
+    /* ready or not ? */
+    private boolean ready = true;
 
     /**
      * Default Constructor.
@@ -77,6 +80,7 @@ public class Job {
         this.jobInputClassName = jobInputClassName;
         this.loadValue = loadValue;
         this.queueName = queueName;
+        this.ready = true;
     }
 
     /**
@@ -169,5 +173,13 @@ public class Job {
     public void setStatus(String status) {
         this.status = status;
     }
+    
+    public boolean getReady(){
+      return this.ready;
+    }
+  
+    public void setReady(boolean readyValue){
+      this.ready = readyValue;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobInput.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobInput.java b/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobInput.java
index fc6011d..f307bc6 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobInput.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/JobInput.java
@@ -18,6 +18,11 @@
 
 package org.apache.oodt.cas.resource.structs;
 
+
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.oodt.cas.metadata.Metadata;
 //OODT imports
 import org.apache.oodt.cas.resource.util.Configurable;
 import org.apache.oodt.cas.resource.util.XmlRpcWriteable;
@@ -39,4 +44,7 @@ public interface JobInput extends XmlRpcWriteable, Configurable {
    */
   String getId();
 
+  Map<String, Vector<String>> getMetadata();
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
----------------------------------------------------------------------
diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
b/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
index 73c26ab..0195c9c 100644
--- a/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
+++ b/resource/src/main/java/org/apache/oodt/cas/resource/structs/NameValueJobInput.java
@@ -20,8 +20,11 @@ package org.apache.oodt.cas.resource.structs;
 
 //JDK imports
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Vector;
 
 /**
  * @author mattmann
@@ -114,4 +117,18 @@ public class NameValueJobInput implements JobInput {
     }
   }
 
+  @Override
+  public Map<String, Vector<String>> getMetadata() {
+    Map<String, Vector<String>> met = new HashMap<String, Vector<String>>();

+    if (props != null && props.keySet() != null && props.keySet().size()
> 0){
+       for (Object key: props.values()){
+         String keyName = (String)key;
+         Vector<String> vals = new Vector<String>();
+         vals.add(props.getProperty(keyName));
+         met.put(keyName, vals);
+       }
+     }
+    return met;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/oodt/blob/4f3588ce/resource/src/test/java/org/apache/oodt/cas/resource/jobqueue/TestFifoMappedJobQueue.java
----------------------------------------------------------------------
diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/jobqueue/TestFifoMappedJobQueue.java
b/resource/src/test/java/org/apache/oodt/cas/resource/jobqueue/TestFifoMappedJobQueue.java
new file mode 100644
index 0000000..cbcfb12
--- /dev/null
+++ b/resource/src/test/java/org/apache/oodt/cas/resource/jobqueue/TestFifoMappedJobQueue.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.resource.jobqueue;
+
+import org.apache.oodt.cas.resource.jobrepo.JobRepository;
+import org.apache.oodt.cas.resource.jobrepo.MemoryJobRepository;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.JobSpec;
+
+import junit.framework.TestCase;
+
+public class TestFifoMappedJobQueue extends TestCase {
+
+  FifoMappedJobQueue q;
+
+  private JobSpec[] jobs = new JobSpec[4];
+
+  private final static JobRepository repo = new MemoryJobRepository();
+
+  public TestFifoMappedJobQueue() {
+  }
+
+  protected void setUp() {
+    q = new FifoMappedJobQueue(2, repo);
+    jobs[0] = new JobSpec(null,
+        new Job("job0q0", "j0", null, null, "queue0", new Integer(1)));
+    jobs[1] = new JobSpec(null,
+        new Job("job0q1", "j1", null, null, "queue1", new Integer(1)));
+    jobs[2] = new JobSpec(null,
+        new Job("job1q0", "j2", null, null, "queue0", new Integer(1)));
+    jobs[3] = new JobSpec(null,
+        new Job("job2q0", "j3", null, null, "queue0", new Integer(1)));
+
+    try {
+      q.addQueue("queue0");
+      q.addQueue("queue1");
+      q.addQueue("queue2");
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  public void testGetNextJob() {
+    // Ensure that requesting a job from a valid, empty queue returns a null job
+    // spec
+    try {
+      JobSpec s = q.getNextJob("queue2");
+      assertNull(s);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  public void testPurge() {
+    try {
+      q.addJob(this.jobs[0]);
+      q.addJob(this.jobs[1]);
+      q.purge();
+      assertEquals(q.getSize(), 0);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  public void testIsEmpty() {
+
+    try {
+      for (String queueName : q.getQueueNames()) {
+        assertTrue(q.isEmpty(queueName));
+      }
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
+  public void testGetJobRepository() {
+    JobRepository r = q.getJobRepository();
+    assertSame(r, this.repo);
+  }
+
+  public void testGetCapacity() {
+    assertEquals(2, q.getCapacity());
+  }
+}
\ No newline at end of file


Mime
View raw message