falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From samar...@apache.org
Subject [2/2] falcon git commit: FALCON-1227 Add logMover check in FeedReplication test. Contributed bu Pragya M
Date Mon, 08 Jun 2015 06:23:11 GMT
FALCON-1227 Add logMover check in FeedReplication test. Contributed bu Pragya M


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/28fd15c4
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/28fd15c4
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/28fd15c4

Branch: refs/heads/master
Commit: 28fd15c496f8175d242ab2f5b783aa41e1c44baf
Parents: 041de1d
Author: samarthg <samarthg@apacge.org>
Authored: Mon Jun 8 11:52:44 2015 +0530
Committer: samarthg <samarthg@apacge.org>
Committed: Mon Jun 8 11:52:44 2015 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   3 +
 .../falcon/regression/core/util/AssertUtil.java |  40 +++++++
 .../falcon/regression/FeedReplicationTest.java  | 120 +++++++++++++++----
 .../apache/falcon/regression/LogMoverTest.java  |  28 +----
 4 files changed, 141 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/28fd15c4/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 189277b..3cd811f 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -92,6 +92,9 @@ Trunk (Unreleased)
    via Samarth Gupta)
 
   IMPROVEMENTS
+
+   FALCON-1227 Add logMover check in FeedReplication test(Pragya M via Samarth G)
+
    FALCON-1258 Fix feed validity and fortify ELExpFutureAndLatestTest (Ruslan Ostafiychuk)
 
    FALCON-1253 Fortify ExternalFSTest (Ruslan Ostafiychuk)

http://git-wip-us.apache.org/repos/asf/falcon/blob/28fd15c4/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
index 3abca7a..095e6f4 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
 import org.apache.falcon.regression.core.response.ServiceResponse;
 import org.apache.falcon.regression.core.supportClasses.ExecResult;
 import org.apache.falcon.resource.APIResult;
@@ -409,4 +410,43 @@ public final class AssertUtil {
             Assert.fail(String.format("%s expected non-empty string found [%s]", message,
str));
         }
     }
+
+    /**
+     * Checks that job logs are copied to user defined cluster staging path.
+     *
+     * @param logFlag     denotes whether is is failed/succeeded log
+     * @param entityName  name of entity
+     * @param clusterFS   hadoop file system for the locations
+     * @param entityType  feed or process
+     */
+    public static boolean assertPath(boolean logFlag, String entityName, FileSystem clusterFS,
+                                     String entityType) throws Exception {
+        String stagingDir= MerlinConstants.STAGING_LOCATION;
+        String path=stagingDir+"/falcon/workflows/"+ entityType + "/" + entityName +"/logs";
+        List<Path> logmoverPaths = HadoopUtil
+                .getAllFilesRecursivelyHDFS(clusterFS, new Path(HadoopUtil.cutProtocol(path)));
+        String part = logFlag ? "SUCCEEDED" : "FAILED";
+        for (Path logmoverPath : logmoverPaths) {
+            if (logmoverPath.toString().contains(part)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Checks that job logs are copied to user defined cluster staging path.
+     *
+     * @param logFlag     denotes whether is is failed/succeeded log
+     * @param entityName  name of entity
+     * @param clusterFS   hadoop file system for the locations
+     * @param entityType  feed or process
+     * @param message     message returned if assert fails
+     */
+    public static void assertLogMoverPath(boolean logFlag, String entityName, FileSystem
clusterFS,
+                                          String entityType, String message) throws Exception
{
+        Assert.assertTrue(assertPath(logFlag, entityName, clusterFS, entityType), message);
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/28fd15c4/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
index 6fd2348..a7a2ea8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
@@ -36,11 +36,9 @@ import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
@@ -53,7 +51,6 @@ import org.testng.annotations.Test;
 
 import javax.xml.bind.JAXBException;
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.List;
 
 /**
@@ -105,8 +102,7 @@ public class FeedReplicationTest extends BaseTestClass {
      */
     @Test(dataProvider = "dataFlagProvider")
     public void replicate1Source1Target(boolean dataFlag)
-        throws AuthenticationException, IOException, URISyntaxException, JAXBException,
-            OozieClientException, InterruptedException {
+        throws Exception {
         Bundle.submitCluster(bundles[0], bundles[1]);
         String startTime = TimeUtil.getTimeWrtSystemTime(0);
         String endTime = TimeUtil.addMinsToTime(startTime, 5);
@@ -158,7 +154,7 @@ public class FeedReplicationTest extends BaseTestClass {
 
         //replication should start, wait while it ends
         InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed.toString()),
1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //check if data has been replicated correctly
         List<Path> cluster1ReplicatedData = HadoopUtil
@@ -173,6 +169,9 @@ public class FeedReplicationTest extends BaseTestClass {
 
         //_SUCCESS should exist in target
         Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, ""), true);
+
+        AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()),
+                cluster2FS, "feed", "Success logs are not present");
     }
 
     /**
@@ -195,27 +194,27 @@ public class FeedReplicationTest extends BaseTestClass {
         feed.clearFeedClusters();
         //set cluster1 as source
         feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.SOURCE)
-                .build());
+                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.SOURCE)
+                        .build());
         //set cluster2 as target
         feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build());
+                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.TARGET)
+                        .withDataLocation(targetDataLocation)
+                        .build());
         //set cluster3 as target
         feed.addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build());
+                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.TARGET)
+                        .withDataLocation(targetDataLocation)
+                        .build());
 
         //submit and schedule feed
         LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
@@ -268,6 +267,9 @@ public class FeedReplicationTest extends BaseTestClass {
         //_SUCCESS should exist in target
         Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, ""), true);
         Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster3FS, toTarget, ""), true);
+
+        AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()),
+                cluster2FS, "feed", "Success logs are not present");
     }
 
     /**
@@ -354,7 +356,7 @@ public class FeedReplicationTest extends BaseTestClass {
 
         //wait till instance succeed
         InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //check if data was replicated correctly
         List<Path> cluster1ReplicatedData = HadoopUtil
@@ -370,8 +372,78 @@ public class FeedReplicationTest extends BaseTestClass {
 
         //availabilityFlag should exist in target
         Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, availabilityFlagName),
true);
+
+        AssertUtil.assertLogMoverPath(true, Util.readEntityName(feed.toString()),
+                cluster2FS, "feed", "Success logs are not present");
     }
 
+    /**
+     * Test demonstrates failure pf replication of stored data from one source cluster to
one target cluster.
+     * When replication job fails test checks if failed logs are present in staging directory
or not.
+     */
+    @Test
+    public void replicate1Source1TargetFail()
+        throws Exception {
+        Bundle.submitCluster(bundles[0], bundles[1]);
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 5);
+        LOGGER.info("Time range between : " + startTime + " and " + endTime);
+
+        //configure feed
+        FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
+        feed.setFilePath(feedDataLocation);
+        //erase all clusters from feed definition
+        feed.clearFeedClusters();
+        //set cluster1 as source
+        feed.addFeedCluster(
+                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.SOURCE)
+                        .build());
+        //set cluster2 as target
+        feed.addFeedCluster(
+                new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.TARGET)
+                        .withDataLocation(targetDataLocation)
+                        .build());
+
+        //submit and schedule feed
+        LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString()));
+        AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString()));
+
+        //upload necessary data
+        DateTime date = new DateTime(startTime, DateTimeZone.UTC);
+        DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy'/'MM'/'dd'/'HH'/'mm'");
+        String timePattern = fmt.print(date);
+        String sourceLocation = sourcePath + "/" + timePattern + "/";
+        String targetLocation = targetPath + "/" + timePattern + "/";
+        HadoopUtil.recreateDir(cluster1FS, sourceLocation);
+
+        Path toSource = new Path(sourceLocation);
+        Path toTarget = new Path(targetLocation);
+        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.NORMAL_INPUT + "dataFile.xml");
+        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.NORMAL_INPUT + "dataFile1.txt");
+
+        //check if coordinator exists
+        InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed.toString(), 0);
+        Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"),
1);
+
+        //check if instance become running
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
+                CoordinatorAction.Status.RUNNING, EntityType.FEED);
+
+        HadoopUtil.deleteDirIfExists(sourceLocation, cluster1FS);
+
+        //check if instance became killed
+        InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1,
+                CoordinatorAction.Status.KILLED, EntityType.FEED);
+
+        AssertUtil.assertLogMoverPath(false, Util.readEntityName(feed.toString()),
+                cluster2FS, "feed", "Success logs are not present");
+    }
 
     /* Flag value denotes whether to add data for replication or not.
      * flag=true : add data for replication.

http://git-wip-us.apache.org/repos/asf/falcon/blob/28fd15c4/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
index 4af27a2..56fe8ab 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java
@@ -24,17 +24,14 @@ import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.util.*;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -126,7 +123,7 @@ public class LogMoverTest extends BaseTestClass {
         InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
                 CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
 
-        Assert.assertTrue(validate(true), "Success logs are not present");
+        AssertUtil.assertLogMoverPath(true, processName, clusterFS, "process", "Success logs
are not present");
     }
 
     /**
@@ -143,28 +140,7 @@ public class LogMoverTest extends BaseTestClass {
         InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1,
                         CoordinatorAction.Status.KILLED, EntityType.PROCESS);
 
-        Assert.assertTrue(validate(false), "Filed logs are not present");
-    }
-
-    private boolean validate(boolean logFlag) throws Exception {
-        String stagingDir= MerlinConstants.STAGING_LOCATION;
-        String path=stagingDir+"/falcon/workflows/process/"+processName+"/logs";
-        List<Path> logmoverPaths = HadoopUtil
-                .getAllFilesRecursivelyHDFS(clusterFS, new Path(HadoopUtil.cutProtocol(path)));
-        if (logFlag) {
-            for (Path logmoverPath : logmoverPaths) {
-                if (logmoverPath.toString().contains("SUCCEEDED")) {
-                    return true;
-                }
-            }
-        } else {
-            for (Path logmoverPath : logmoverPaths) {
-                if (logmoverPath.toString().contains("FAILED")) {
-                    return true;
-                }
-            }
-        }
-        return false;
+        AssertUtil.assertLogMoverPath(false, processName, clusterFS, "process", "Failed logs
are not present");
     }
 
 }


Mime
View raw message