falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prag...@apache.org
Subject falcon git commit: FALCON-1566 Add test for SLA monitoring API
Date Thu, 18 Feb 2016 09:50:31 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 54402764c -> 830ab368b


FALCON-1566 Add test for SLA monitoring API

Author: Pragya <mittal.pragya23@gmail.com>

Reviewers: Paul Isaychuk <pisaychuk@apache.org>, Deepak Kumar Barr <deepak.barr@gmail.com>

Closes #44 from pragya-mittal/feed-sla and squashes the following commits:

c6c6f65 [Pragya] Review comments addressed
251d404 [Pragya] Resolved merge conflicts
f5b2888 [Pragya] FALCON-1566 Add test for SLA monitoring API
f037385 [Pragya] Merge branch 'master' of https://github.com/apache/falcon
4c19ec0 [Pragya] Merge branch 'master' of https://github.com/apache/falcon
3b7fd63 [Pragya] FALCON-1829 Add regression for submit and schedule process on native scheduler
(time based)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/830ab368
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/830ab368
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/830ab368

Branch: refs/heads/master
Commit: 830ab368b7454151033fff137ee1640dbcee3e41
Parents: 5440276
Author: Pragya <mittal.pragya23@gmail.com>
Authored: Thu Feb 18 15:20:02 2016 +0530
Committer: Pragya Mittal <mittal.pragya23@gmail.com>
Committed: Thu Feb 18 15:20:02 2016 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   2 +
 .../helpers/entity/AbstractEntityHelper.java    |  17 +-
 .../core/response/ServiceResponse.java          |  10 +
 .../falcon/regression/core/util/Util.java       |   3 +-
 .../regression/SLA/FeedSLAMonitoringTest.java   | 258 +++++++++++++++++++
 .../nativeScheduler/NativeScheduleTest.java     |   7 +-
 6 files changed, 293 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/830ab368/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 566f7e1..e3f7264 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-1566 Add test for SLA monitoring API (Pragya Mittal)
+
    FALCON-1567 Test case for Lifecycle feature  (Pragya Mittal)
 
    FALCON-1784 Add regression test for for FALCON-1647 (Paul Isaychuk)

http://git-wip-us.apache.org/repos/asf/falcon/blob/830ab368/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
index 27e12d0..e1a9288 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
@@ -701,7 +701,7 @@ public abstract class AbstractEntityHelper {
         if (StringUtils.isNotEmpty(params)){
             url += colo.isEmpty() ? "?" + params : "&" + params;
         }
-        return Util.sendRequestLineage(createUrl(url), "get", null, null);
+        return Util.sendJSONRequest(createUrl(url), "get", null, null);
     }
 
     /**
@@ -715,4 +715,19 @@ public abstract class AbstractEntityHelper {
                 .createAndSendRequestProcessInstance(url, params, allColo, user);
     }
 
+    /**
+     * Retrieves sla alerts.
+     * @param params list of optional parameters
+     * @return instances with sla missed.
+     */
+    public ServiceResponse getSlaAlert(String params)
+        throws URISyntaxException, AuthenticationException, InterruptedException, IOException
{
+        String url = createUrl(this.hostname + URLS.SLA.getValue(),
+                getEntityType());
+        if (StringUtils.isNotEmpty(params)) {
+            url +=  params;
+        }
+        return Util.sendJSONRequest(createUrl(url), "get", null, null);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/830ab368/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
index 55e862c..f66d426 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
@@ -25,6 +25,7 @@ import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.EntitySummaryResult;
 import org.apache.falcon.resource.LineageGraphResult;
+import org.apache.falcon.resource.SchedulableEntityInstanceResult;
 import org.apache.http.HttpResponse;
 import org.apache.log4j.Logger;
 
@@ -121,4 +122,13 @@ public class ServiceResponse {
         return lineageGraphResult;
     }
 
+    /**
+     * Retrieves SchedulableEntityInstanceResult from a message if possible.
+     * @return SchedulableEntityInstanceResult
+     */
+    public SchedulableEntityInstanceResult getSlaResult() {
+        SchedulableEntityInstanceResult schedulableEntityInstanceResult = new GsonBuilder().create().fromJson(message,
+                SchedulableEntityInstanceResult.class);
+        return schedulableEntityInstanceResult;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/830ab368/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
index ccd083b..452effa 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
@@ -385,6 +385,7 @@ public final class Util {
         STATUS_URL("/api/entities/status"),
         ENTITY_SUMMARY("/api/entities/summary"),
         SUBMIT_AND_SCHEDULE_URL("/api/entities/submitAndSchedule"),
+        SLA("/api/entities/sla-alert"),
         ENTITY_LINEAGE("/api/metadata/lineage/entities"),
         INSTANCE_RUNNING("/api/instance/running"),
         INSTANCE_STATUS("/api/instance/status"),
@@ -595,7 +596,7 @@ public final class Util {
      * @throws URISyntaxException
      * @throws AuthenticationException
      */
-    public static ServiceResponse sendRequestLineage(String url, String method, String data,
String user)
+    public static ServiceResponse sendJSONRequest(String url, String method, String data,
String user)
         throws IOException, URISyntaxException, AuthenticationException, InterruptedException
{
         BaseRequest request = new BaseRequest(url, method, user, data);
         request.addHeader(RequestKeys.CONTENT_TYPE_HEADER, RequestKeys.JSON_CONTENT_TYPE);

http://git-wip-us.apache.org/repos/asf/falcon/blob/830ab368/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java
new file mode 100644
index 0000000..fa1f808
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/SLA/FeedSLAMonitoringTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.SLA;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.resource.SchedulableEntityInstanceResult;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collections;
+
+
+/**
+ * Feed SLA monitoring tests.
+ * Test assumes following properties are set in startup.properties of server :
+ *      *.feed.sla.statusCheck.frequency.seconds=60
+ *      *.feed.sla.lookAheadWindow.millis=60000
+ */
+@Test(groups = { "distributed", "embedded" })
+public class FeedSLAMonitoringTest extends BaseTestClass {
+
+    private ColoHelper cluster = servers.get(0);
+    private FileSystem clusterFS = serverFS.get(0);
+    private String baseTestHDFSDir = cleanAndGetTestDir();
+    private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN;
+    private List<String> slaFeedNames;
+    private List<Frequency> slaFeedFrequencies;
+    private String clusterName;
+    private static final Logger LOGGER = Logger.getLogger(FeedSLAMonitoringTest.class);
+
+    private String startTime;
+    private String endTime;
+    private String slaStartTime;
+    private String slaEndTime;
+    private int noOfFeeds;
+    private int statusCheckFrequency;
+
+    private static final Comparator<SchedulableEntityInstance> DEPENDENCY_COMPARATOR
=
+            new Comparator<SchedulableEntityInstance>() {
+                @Override
+                public int compare(SchedulableEntityInstance o1, SchedulableEntityInstance
o2) {
+                    return o1.compareTo(o2);
+                }
+            };
+
+    /**
+     * Submitting 3 feeds with different frequencies and sla values.
+     * @throws Exception
+     */
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle(this);
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        clusterName = bundles[0].getClusterNames().get(0);
+        ServiceResponse response =
+                prism.getClusterHelper().submitEntity(bundles[0].getClusters().get(0));
+        AssertUtil.assertSucceeded(response);
+
+        startTime = TimeUtil.getTimeWrtSystemTime(-10);
+        endTime = TimeUtil.addMinsToTime(startTime, 20);
+        noOfFeeds=3;
+
+        LOGGER.info("Time range between : " + startTime + " and " + endTime);
+        final String oldFeedName = bundles[0].getInputFeedNameFromBundle();
+        slaFeedFrequencies = Arrays.asList(new Frequency("1", Frequency.TimeUnit.minutes),
+                new Frequency("2", Frequency.TimeUnit.minutes),
+                new Frequency("4", Frequency.TimeUnit.minutes));
+
+        slaFeedNames = Arrays.asList(oldFeedName + "-1", oldFeedName + "-2", oldFeedName
+ "-3");
+
+        //Submit 3 feeds with different frequencies and sla values.
+        for (int bIndex = 0; bIndex < noOfFeeds; ++bIndex) {
+            final FeedMerlin ipFeed = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+
+            ipFeed.setValidity(startTime, endTime);
+            ipFeed.setAvailabilityFlag("_SUCCESS");
+
+            //set slaLow and slaHigh
+            ipFeed.setSla(new Frequency("1", Frequency.TimeUnit.minutes),
+                    new Frequency("2", Frequency.TimeUnit.minutes));
+            ipFeed.setName(slaFeedNames.get(bIndex));
+            ipFeed.setFrequency(slaFeedFrequencies.get(bIndex));
+
+            LOGGER.info("Feed is : " + ipFeed.toString());
+
+            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(ipFeed.toString()));
+        }
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws IOException {
+        cleanTestsDirs();
+        removeTestClassEntities();
+    }
+
+    /**
+     * The following test submits 3 feeds, checks the slaAlert for a given time range and
validates its output.
+     * It also checks the sla status when feed is deleted , data created with/without _SUCCESS
folder.
+     * @throws Exception
+     */
+    @Test
+    public void feedSLATest() throws Exception {
+        /**TEST : Check sla response for a given time range
+         */
+
+        statusCheckFrequency=60; // 60 seconds
+
+        // Map of instanceDate and corresponding list of SchedulableEntityInstance
+        Map<String, List<SchedulableEntityInstance>> instanceEntityMap = new
HashMap<>();
+
+        slaStartTime = startTime;
+        slaEndTime = TimeUtil.addMinsToTime(slaStartTime, 10);
+        DateTime slaStartDate = TimeUtil.oozieDateToDate(slaStartTime);
+        DateTime slaEndDate = TimeUtil.oozieDateToDate(slaEndTime);
+
+        List<SchedulableEntityInstance> expectedInstances = new ArrayList<>();
+        SchedulableEntityInstance expectedSchedulableEntityInstance;
+
+        for (int index = 0; index < noOfFeeds; ++index) {
+
+            DateTime dt = new DateTime(slaStartDate);
+            while (!dt.isAfter(slaEndDate)) {
+
+                expectedSchedulableEntityInstance = new SchedulableEntityInstance(slaFeedNames.get(index),
+                        clusterName, dt.toDate(), EntityType.FEED);
+                expectedSchedulableEntityInstance.setTags("Missed SLA High");
+                expectedInstances.add(expectedSchedulableEntityInstance);
+
+                if (!instanceEntityMap.containsKey(dt.toString())) {
+                    instanceEntityMap.put(dt.toString(), new ArrayList<SchedulableEntityInstance>());
+                }
+                instanceEntityMap.get(dt.toString()).add(expectedSchedulableEntityInstance);
+                dt = dt.plusMinutes(slaFeedFrequencies.get(index).getFrequencyAsInt());
+
+            }
+        }
+
+        TimeUtil.sleepSeconds(statusCheckFrequency);
+
+        SchedulableEntityInstanceResult response = prism.getFeedHelper().getSlaAlert(
+                "?start=" + slaStartTime + "&end=" + slaEndTime).getSlaResult();
+
+        LOGGER.info(response.getMessage());
+
+        validateInstances(response, expectedInstances);
+
+        /**TEST : Create missing dependencies with _SUCCESS directory and check sla response
+         */
+
+        String dateEntry = (String) instanceEntityMap.keySet().toArray()[1];
+        LOGGER.info(dateEntry + "/" + instanceEntityMap.get(dateEntry));
+        List<String> dataDates = InstanceUtil.getMinuteDatesToPath(dateEntry, dateEntry,
0);
+
+        HadoopUtil.createFolders(clusterFS, baseTestHDFSDir + "/input/", dataDates);
+
+        //sla response for feeds when _SUCCESS file is missing from dataPath
+        response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end="
+ slaEndTime).getSlaResult();
+
+        // Response does not change as it checks for _SUCCESS file
+        validateInstances(response, expectedInstances);
+
+        //Create _SUCCESS file
+        HadoopUtil.recreateDir(clusterFS, baseTestHDFSDir + "/input/" + dataDates.get(0)
+ "/_SUCCESS");
+        for (SchedulableEntityInstance instance : instanceEntityMap.get(dateEntry)) {
+            expectedInstances.remove(instance);
+        }
+        instanceEntityMap.remove(dateEntry);
+
+        TimeUtil.sleepSeconds(statusCheckFrequency);
+
+        //sla response for feeds when _SUCCESS file is available in dataPath
+        response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end="
+ slaEndTime).getSlaResult();
+        validateInstances(response, expectedInstances);
+
+        /** TEST : Delete feed and check sla response
+         */
+        String deletedFeed = slaFeedNames.get(0);
+        prism.getFeedHelper().deleteByName(deletedFeed, null);
+
+        for (Map.Entry<String, List<SchedulableEntityInstance>> entry : instanceEntityMap.entrySet())
+        {
+            LOGGER.info(entry.getKey() + "/" + entry.getValue());
+            for (SchedulableEntityInstance instance : entry.getValue()) {
+                if (instance.getEntityName().equals(deletedFeed)) {
+                    expectedInstances.remove(instance);
+                }
+            }
+
+        }
+        TimeUtil.sleepSeconds(statusCheckFrequency);
+        response = prism.getFeedHelper().getSlaAlert("?start=" + slaStartTime + "&end="
+ slaEndTime).getSlaResult();
+        validateInstances(response, expectedInstances);
+
+    }
+
+    /**
+     * Validating expected response with actual response.
+     * @param response SchedulableEntityInstanceResult response
+     * @param expectedInstances List of expected instances
+     */
+    private static void validateInstances(SchedulableEntityInstanceResult response,
+            List<SchedulableEntityInstance> expectedInstances) {
+
+        List<SchedulableEntityInstance> actualInstances = Arrays.asList(response.getInstances());
+
+        for (SchedulableEntityInstance instance : actualInstances) {
+            instance.setTags("Missed SLA High");
+        }
+
+        Collections.sort(expectedInstances, DEPENDENCY_COMPARATOR);
+        Collections.sort(actualInstances, DEPENDENCY_COMPARATOR);
+
+        Assert.assertEquals(actualInstances, expectedInstances, "Instances mismatch for");
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/830ab368/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
index fe61cdf..54e7805 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
@@ -21,10 +21,13 @@ import org.apache.falcon.regression.Entities.ProcessMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.response.ServiceResponse;
-import org.apache.falcon.regression.core.util.*;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.log4j.Logger;
-import org.apache.oozie.client.OozieClient;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;


Mime
View raw message