falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samar...@apache.org
Subject [1/2] falcon git commit: FALCON-1215 Adding new test cases related to rerun feature. Contributed by Pragya M
Date Mon, 08 Jun 2015 06:23:10 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 207443e9d -> 28fd15c49


FALCON-1215 Adding new test cases related to rerun feature. Contributed by Pragya M


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/041de1de
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/041de1de
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/041de1de

Branch: refs/heads/master
Commit: 041de1def1e88430534dd9ced63f983bf711b351
Parents: 207443e
Author: samarthg <samarthg@apacge.org>
Authored: Mon Jun 8 11:49:31 2015 +0530
Committer: samarthg <samarthg@apacge.org>
Committed: Mon Jun 8 11:49:31 2015 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   3 +
 .../falcon/regression/core/util/OSUtil.java     |   4 +
 .../falcon/regression/core/util/OozieUtil.java  |  47 +++++++-
 .../regression/ProcessInstanceRerunTest.java    | 112 +++++++++++++++++--
 .../MultipleActionWorkflow/workflow.xml         |  75 +++++++++++++
 5 files changed, 230 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/041de1de/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index fa0ac0c..189277b 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   
+   FALCON-1215 Adding new test cases related to rerun feature (Pragya M via Samarth Gupta)
+
    FALCON-1249 Tests for process setup wizard (Namit Maheshwari and Paul Isaychuk)
 
    FALCON-1242 Search UI test for entity upload button (Namit Maheshwari)

http://git-wip-us.apache.org/repos/asf/falcon/blob/041de1de/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
index ab27ccf..d1f1c24 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java
@@ -47,6 +47,10 @@ public final class OSUtil {
 
     public static final String OOZIE_LIB_FOLDER =
             String.format(RESOURCES + "oozieLib%s", SEPARATOR);
+    public static final String MULTIPLE_ACTION_WORKFLOW =
+            String.format(RESOURCES + "MultipleActionWorkflow%s", SEPARATOR);
+    public static final String PIG_DIR =
+            String.format(RESOURCES + "pig%s", SEPARATOR);
 
 
     public static String getPath(String... pathParts) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/041de1de/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
index 15b0497..ef7d887 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -23,11 +23,12 @@ import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper;
 import org.apache.oozie.client.AuthOozieClient;
 import org.apache.oozie.client.BundleJob;
-import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.OozieClientException;
 import org.apache.oozie.client.Job;
+import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowAction;
 import org.joda.time.DateTime;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTimeZone;
@@ -49,6 +50,8 @@ import java.util.TreeMap;
  * helper methods for oozie .
  */
 public final class OozieUtil {
+
+    public static final String FAIL_MSG = "NO_such_workflow_exists";
     private OozieUtil() {
         throw new AssertionError("Instantiating utility class...");
     }
@@ -705,4 +708,46 @@ public final class OozieUtil {
             return OSUtil.IS_WINDOWS ? 60 : 30;
         }
     }
+
+    public static String getActionStatus(OozieClient oozieClient, String workflowId, String
actionName)
+        throws OozieClientException {
+        List<WorkflowAction> wfAction = oozieClient.getJobInfo(workflowId).getActions();
+        for (WorkflowAction wf : wfAction) {
+            if (wf.getName().contains(actionName)) {
+                return wf.getExternalStatus();
+            }
+        }
+        return "";
+    }
+
+    public static String getWorkflowActionStatus(OozieClient oozieClient, String bundleId,
String actionName)
+        throws OozieClientException {
+        List<String> workflowIds = getWorkflowJobs(oozieClient, bundleId);
+        if (workflowIds.get(0).isEmpty()) {
+            return FAIL_MSG;
+        }
+        return getActionStatus(oozieClient, workflowIds.get(0), actionName);
+    }
+
+    public static String getSubWorkflowActionStatus(OozieClient oozieClient, String bundleId,
+                                                    String actionName, String subAction)
+        throws OozieClientException {
+        List<String> workflowIds = getWorkflowJobs(oozieClient, bundleId);
+        if (workflowIds.get(0).isEmpty()) {
+            return FAIL_MSG;
+        }
+
+        String wid="";
+        List<WorkflowAction> wfAction = oozieClient.getJobInfo(workflowIds.get(0)).getActions();
+        for (WorkflowAction wf : wfAction) {
+            if (wf.getName().contains(actionName)) {
+                wid = wf.getExternalId();
+            }
+        }
+
+        if (!wid.isEmpty()) {
+            return getActionStatus(oozieClient, wid, subAction);
+        }
+        return FAIL_MSG;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/041de1de/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
index f65f9c9..ef3d8a7 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.regression;
 
+import org.apache.falcon.entity.v0.process.*;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
@@ -51,6 +52,7 @@ import java.util.List;
 @Test(groups = "embedded")
 public class ProcessInstanceRerunTest extends BaseTestClass {
 
+    private boolean restartRequired;
     private String baseTestDir = cleanAndGetTestDir();
     private String aggregateWorkflowDir = baseTestDir + "/aggregator";
     private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN;
@@ -111,7 +113,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4);
         List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName);
         prism.getProcessHelper().getProcessInstanceRerun(processName,
-            start + "&end=2010-01-02T01:11Z");
+                start + "&end=2010-01-02T01:11Z");
         InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0);
     }
 
@@ -203,7 +205,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(),
0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3,
-            CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
+                CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
         InstancesResult r = prism.getProcessHelper()
             .getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z");
         InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
@@ -234,7 +236,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
         List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName);
         prism.getProcessHelper().getProcessInstanceRerun(processName,
-            start + "&end=2010-01-02T01:11Z");
+                start + "&end=2010-01-02T01:11Z");
         TimeUtil.sleepSeconds(TIMEOUT);
         InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 6, 0, 0);
     }
@@ -253,7 +255,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(),
0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
-            CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
+                CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
         prism.getProcessHelper().getProcessInstanceKill(processName,
                 start + "&end=2010-01-02T01:01Z");
         String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.KILLED).get(0);
@@ -282,7 +284,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.RUNNING,
             Status.SUCCEEDED).get(0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 0, CoordinatorAction
-            .Status.SUCCEEDED, EntityType.PROCESS);
+                .Status.SUCCEEDED, EntityType.PROCESS);
         prism.getProcessHelper().getProcessInstanceRerun(processName,
                 start + "&end=2010-01-02T01:01Z&force=true");
         Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
@@ -305,9 +307,9 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
             CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
         prism.getProcessHelper().getProcessInstanceSuspend(processName,
-            start + "&end=2010-01-02T01:06Z");
+                start + "&end=2010-01-02T01:06Z");
         prism.getProcessHelper().getProcessInstanceRerun(processName,
-            start + "&end=2010-01-02T01:06Z");
+                start + "&end=2010-01-02T01:06Z");
         Assert.assertEquals(InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 1),
                 CoordinatorAction.Status.SUSPENDED);
     }
@@ -326,7 +328,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(),
0);
         OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+                CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
         List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName);
         prism.getProcessHelper().getProcessInstanceRerun(processName,
                 start + "&end=2010-01-02T01:11Z&force=true");
@@ -351,9 +353,99 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
         InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
                 CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS);
         prism.getProcessHelper().getProcessInstanceRerun(processName,
-            start + "&end=2010-01-02T01:11Z");
+                start + "&end=2010-01-02T01:11Z");
         s = InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 0);
         Assert.assertEquals(s, CoordinatorAction.Status.WAITING,
-            "instance should have been in WAITING state");
+                "instance should have been in WAITING state");
+    }
+
+    @Test(groups = {"singleCluster"}, timeOut = 1200000)
+    public void testProcessInstanceRerunFailedPostProcessing() throws Exception {
+        restartRequired=true;
+        bundles[0].setProcessValidity("2015-01-02T01:00Z", "2015-01-02T01:04Z");
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].submitFeedsScheduleProcess(prism);
+
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(),
EntityType.PROCESS);
+
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(),
0);
+
+        //bring down Server1 colo
+        Util.shutDownService(cluster.getClusterHelper());
+
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+
+        //wait for instance to go in killing state
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+                CoordinatorAction.Status.KILLED, EntityType.PROCESS, 5);
+
+        Assert.assertEquals(OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "post-processing")
+                .contains("KILLED"), true);
+        Assert.assertEquals(OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "user-action")
+                .contains("SUCCEEDED"), true);
+
+        //start Server1 colo
+        Util.startService(cluster.getClusterHelper());
+        TimeUtil.sleepSeconds(10);
+
+        prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2015-01-02T01:00Z&end=2015-01-02T01:04Z");
+
+        while (!OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "post-processing").contains("SUCCEEDED"))
{
+            TimeUtil.sleepSeconds(10);
+        }
+    }
+
+    @Test(groups = {"singleCluster"}, timeOut = 1200000)
+    public void testProcessInstanceRerunFailedWorkflowAction() throws Exception {
+
+        // Defining path to be used in pig script
+        String propPath = cleanAndGetTestDir() + "/rerun";
+        org.apache.falcon.entity.v0.process.Process processElement = bundles[0].getProcessObject();
+        Properties properties = new Properties();
+        Property propertyInput = new Property();
+        propertyInput.setName("inputPath");
+        propertyInput.setValue(propPath);
+
+        Property propertyOutput = new Property();
+        propertyOutput.setName("outputPath");
+        propertyOutput.setValue(propPath + "/output");
+        properties.getProperties().add(propertyInput);
+        properties.getProperties().add(propertyOutput);
+        processElement.setProperties(properties);
+        bundles[0].setProcessData(processElement.toString());
+
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.MULTIPLE_ACTION_WORKFLOW);
+        HadoopUtil.copyDataToFolder(clusterFS, aggregateWorkflowDir, OSUtil.PIG_DIR + "id.pig");
+
+        bundles[0].setProcessValidity("2015-01-02T01:00Z", "2015-01-02T01:04Z");
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].submitFeedsScheduleProcess(prism);
+
+        String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(),
EntityType.PROCESS);
+
+        InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(),
0);
+
+        OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+
+        //wait for instance to get killed
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+                CoordinatorAction.Status.KILLED, EntityType.PROCESS, 5);
+
+        Assert.assertEquals(OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "user-action")
+                .contains("KILLED"), true);
+        Assert.assertEquals(OozieUtil.getSubWorkflowActionStatus(clusterOC, bundleId, "user-action",
"pig")
+                .contains("KILLED"), true);
+        Assert.assertEquals(OozieUtil.getSubWorkflowActionStatus(clusterOC, bundleId, "user-action",
"aggregator")
+                .contains("SUCCEEDED"), true);
+
+        HadoopUtil.uploadDir(clusterFS, propPath, OSUtil.MULTIPLE_ACTION_WORKFLOW);
+
+        prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2015-01-02T01:00Z&end=2015-01-02T01:04Z");
+
+        while (!OozieUtil.getSubWorkflowActionStatus(clusterOC, bundleId, "user-action",
"pig").contains("SUCCEEDED")) {
+            TimeUtil.sleepSeconds(10);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/041de1de/falcon-regression/merlin/src/test/resources/MultipleActionWorkflow/workflow.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/MultipleActionWorkflow/workflow.xml
b/falcon-regression/merlin/src/test/resources/MultipleActionWorkflow/workflow.xml
new file mode 100644
index 0000000..8733f90
--- /dev/null
+++ b/falcon-regression/merlin/src/test/resources/MultipleActionWorkflow/workflow.xml
@@ -0,0 +1,75 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.2" name="aggregator-wf">
+    <start to="aggregator"/>
+    <action name="aggregator">
+        <map-reduce>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <prepare>
+                <delete path="${outputData}"/>
+            </prepare>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>mapred.mapper.class</name>
+                    <value>org.apache.hadoop.mapred.lib.IdentityMapper</value>
+                </property>
+                <property>
+                    <name>mapred.reducer.class</name>
+                    <value>org.apache.hadoop.mapred.lib.IdentityReducer</value>
+                </property>
+                <property>
+                    <name>mapred.map.tasks</name>
+                    <value>1</value>
+                </property>
+                <property>
+                    <name>mapred.input.dir</name>
+                    <value>${inputData}</value>
+                </property>
+                <property>
+                    <name>mapred.output.dir</name>
+                    <value>${outputData}</value>
+                </property>
+            </configuration>
+        </map-reduce>
+        <ok to="pigAction"/>
+        <error to="failMapRed"/>
+    </action>
+
+    <action name="pigAction">
+        <pig>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <script>id.pig</script>
+            <param>INPUT=${inputPath}</param>
+            <param>OUTPUT=${outputPath}</param>
+        </pig>
+        <ok to="end"/>
+        <error to="failPig"/>
+    </action>
+
+    <kill name="failMapRed">
+        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <kill name="failPig">
+        <message>Pig action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>


Mime
View raw message