falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samar...@apache.org
Subject falcon git commit: FALCON-1040 Modifying ProcessInstanceStatusTest to expose job id for running jobs in Falcon. Contributed by Pragya M
Date Thu, 26 Feb 2015 05:58:25 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 594566c01 -> f63a47f51


FALCON-1040 Modifying ProcessInstanceStatusTest to expose job id for running jobs in Falcon.
Contributed by Pragya M


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f63a47f5
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f63a47f5
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f63a47f5

Branch: refs/heads/master
Commit: f63a47f51f5fb6b4ea1e9d2b1291733c18e47ab0
Parents: 594566c
Author: samarthg <samarthg@apacge.org>
Authored: Thu Feb 26 11:28:12 2015 +0530
Committer: samarthg <samarthg@apacge.org>
Committed: Thu Feb 26 11:28:12 2015 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |  3 +
 .../regression/core/util/InstanceUtil.java      | 20 +++++-
 .../falcon/regression/core/util/OozieUtil.java  | 18 +++++
 .../regression/ProcessInstanceStatusTest.java   | 69 ++++++++++++++++++--
 4 files changed, 104 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f63a47f5/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index ababa41..6e6ac7f 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -51,6 +51,9 @@ Trunk (Unreleased)
    via Samarth Gupta)
 
   IMPROVEMENTS
+   
+   FALCON-1040 Modifying ProcessInstanceStatusTest to expose job id for running jobs in 
+   Falcon. (Pragya M via Samarth G)
 
    FALCON-1044 Add tests for the change that start and end are compulsory parameters for

    all instance POST apis.(Karishma G via Samarth)

http://git-wip-us.apache.org/repos/asf/falcon/blob/f63a47f5/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
index 3524355..5d4e657 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
@@ -202,7 +202,7 @@ public final class InstanceUtil {
         List<InstancesResult.WorkflowStatus> statuses = new ArrayList<InstancesResult.WorkflowStatus>();
         for (InstancesResult.Instance instance : instances) {
             final InstancesResult.WorkflowStatus status = instance.getStatus();
-            LOGGER.info("status: "+ status + ", instance: " + instance.getInstance());
+            LOGGER.info("status: " + status + ", instance: " + instance.getInstance());
             statuses.add(status);
         }
 
@@ -216,6 +216,24 @@ public final class InstanceUtil {
             killedCount, "Killed Instances");
     }
 
+    public static List<String> getWorkflowJobIds(InstancesResult instancesResult) {
+        InstancesResult.Instance[] instances = instancesResult.getInstances();
+        LOGGER.info("instances: " + Arrays.toString(instances));
+        Assert.assertNotNull(instances, "instances should be not null");
+        List<String> wfids = new ArrayList<String>();
+        for (InstancesResult.Instance instance : instances) {
+            LOGGER.warn("instance: " + instance + " , status: "
+                    + instance.getStatus() +  ", logs : " + instance.getLogFile());
+            if (instance.getStatus().name().equals("RUNNING") || instance.getStatus().name().equals("SUCCEEDED"))
{
+                wfids.add(instance.getLogFile());
+            }
+            if (instance.getStatus().name().equals("KILLED") || instance.getStatus().name().equals("WAITING"))
{
+                Assert.assertNull(instance.getLogFile());
+            }
+        }
+        return wfids;
+    }
+
     /**
      * Checks that expected number of failed instances matches actual number of failed ones.
      *

http://git-wip-us.apache.org/repos/asf/falcon/blob/f63a47f5/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
index d74864f..3cc171f 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -306,6 +306,24 @@ public final class OozieUtil {
         return workflowIds;
     }
 
+    public static List<String> getWorkflow(ColoHelper coloHelper, String bundleID)
+        throws OozieClientException {
+        OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient();
+        waitForCoordinatorJobCreation(oozieClient, bundleID);
+        List<String> workflowIds = new ArrayList<String>();
+        String coordId = InstanceUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID);
+        CoordinatorJob coordJobInfo = oozieClient.getCoordJobInfo(coordId);
+        for (CoordinatorAction action : coordJobInfo.getActions()) {
+            if (action.getStatus().name().equals("RUNNING") || action.getStatus().name().equals("SUCCEEDED"))
{
+                workflowIds.add(action.getExternalId());
+            }
+            if (action.getStatus().name().equals("KILLED") || action.getStatus().name().equals("WAITING"))
{
+                Assert.assertNull(action.getExternalId());
+            }
+        }
+        return workflowIds;
+    }
+
     public static Date getNominalTime(ColoHelper prismHelper, String bundleID)
         throws OozieClientException {
         OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient();

http://git-wip-us.apache.org/repos/asf/falcon/blob/f63a47f5/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
index e1e7dc1..b6440b8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@ -52,6 +52,8 @@ import org.testng.annotations.Test;
 import javax.xml.bind.JAXBException;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * Process instance status tests.
@@ -107,7 +109,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
     /**
      * time out is set as 3 minutes .... getStatus is for a large range in past.
      * 6 instance should be materialized and one in running and other in waiting
-     *
+     * Adding logging information test as part of FALCON-813.
+     * In case status does not contain jobId of instance the test should fail.
      * @throws Exception
      */
     @Test(groups = {"singleCluster"})
@@ -118,12 +121,16 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
         bundles[0].setProcessConcurrency(1);
         bundles[0].submitFeedsScheduleProcess(prism);
+        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(),
EntityType.PROCESS);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.RUNNING,
EntityType.PROCESS);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z");
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
         InstanceUtil.validateResponse(r, 6, 1, 0, 5, 0);
+        List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
+        Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed
in status message");
     }
 
     /**
@@ -157,13 +164,15 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
         bundles[0].submitFeedsScheduleProcess(prism);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
-            "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z");
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z");
         InstanceUtil.validateResponse(r, 5, 0, 0, 5, 0);
     }
 
     /**
      * Schedule process and try to -getStatus without date parameters. Attempt should succeed.
Start defaults
      * to start of entity and end defaults to end of entity.
+     * Adding logging information test as part of status information.
+     * In case status does not contain jobId of instance the test should fail.
      */
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceStatusDateEmpty()
@@ -172,35 +181,47 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:30Z");
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
+        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(),
EntityType.PROCESS);
         InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 5,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
null);
         InstanceUtil.validateResponse(r, 6, 5, 0, 1, 0);
+        List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
+        Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed
in status message");
     }
 
     /**
      * Schedule process with number of instances. Perform -getStatus request with valid
      * parameters. Attempt should succeed.
-     *
+     * Adding logging information test as part of status information.
+     * In case status does not contain jobId of instance the test should fail.
+    *
      * @throws Exception
      */
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceStatusStartAndEnd() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
+        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(),
EntityType.PROCESS);
         InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1 ,
                 CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+        List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
+        Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed
in status message");
     }
 
     /**
      * Schedule process. Perform -getStatus using -start parameter which is out of process
      * validity range. Attempt should succeed, with start defaulted to entity start time.
+     * Adding logging information test as part of status information.
+     * In case status does not contain jobId of instance the test should fail.
      *
      * @throws Exception
      */
@@ -209,13 +230,17 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
         bundles[0].setProcessConcurrency(5);
         bundles[0].submitFeedsScheduleProcess(prism);
+        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(),
EntityType.PROCESS);
         InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T00:00Z&end=2010-01-02T01:21Z");
         InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+        List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
+        Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed
in status message");
     }
 
     /**
@@ -267,6 +292,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
     /**
      * Schedule process. Perform -getStatus using -start/-end parameters which are out of
process
      * validity range. Attempt should succeed, with start/end defaulted to entity's start/end.
+     * Adding logging information test as part of status information.
+     * In case status does not contain jobId of instance the test should fail.
      *
      * @throws Exception
      */
@@ -276,13 +303,17 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setOutputFeedLocationData(feedOutputPath);
         bundles[0].setProcessConcurrency(2);
         bundles[0].submitFeedsScheduleProcess(prism);
+        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(),
EntityType.PROCESS);
         InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
         InstanceUtil.validateResponse(r, 5, 2, 0, 3, 0);
+        List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
+        Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed
in status message");
     }
 
     /**
@@ -316,19 +347,24 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
      * Schedule process. -getStatus of it's first instance using only -start parameter which
      * points to start time of process validity. Check that response reflects expected status
of
      * instance.
-     *
+     * Adding logging information test as part of status information.
+     * In case status does not contain jobId of instance the test should fail.
      * @throws Exception
      */
     @Test(groups = {"singleCluster"})
     public void testProcessInstanceStatusOnlyStart() throws Exception {
         bundles[0].submitFeedsScheduleProcess(prism);
+        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(),
EntityType.PROCESS);
         InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0,
0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
             "?start=2010-01-02T01:00Z");
         InstanceUtil.validateResponse(r, 5, 1, 0, 4, 0);
+        List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
+        Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed
in status message");
     }
 
     /**
@@ -348,7 +384,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
 
     /**
      * Schedule process. Try to -getStatus without time range parameters. Attempt succeeds.
-     *
+     * Adding logging information test as part of status information.
+     * In case status does not contain jobId of instance the test should fail.
      *
      * @throws Exception
      */
@@ -357,12 +394,16 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
         bundles[0].setProcessConcurrency(5);
         bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
         bundles[0].submitFeedsScheduleProcess(prism);
+        String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(),
EntityType.PROCESS);
         InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
+        List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId);
         InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
null);
         InstanceUtil.validateResponse(r, 5, 5, 0, 0, 0);
+        List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r);
+        Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed
in status message");
     }
 
     /**
@@ -386,4 +427,22 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
             "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
         InstanceUtil.validateFailedInstances(r, 3);
     }
+
+    /*
+    * Function to match the workflows obtained from instance status and oozie.
+    */
+    private boolean matchWorkflows(List<String> instanceWf, List<String> oozieWf)
{
+        Collections.sort(instanceWf);
+        Collections.sort(oozieWf);
+        if (instanceWf.size() != oozieWf.size()) {
+            return false;
+        }
+        for (int index = 0; index < instanceWf.size(); index++) {
+            if (!instanceWf.get(index).contains(oozieWf.get(index))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
 }


Mime
View raw message