falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject git commit: FALCON-26 Pipeline Monitor addition. Contributed by Suhas Vasu
Date Tue, 28 Jan 2014 06:57:08 GMT
Updated Branches:
  refs/heads/master d1642beab -> 5c9304406


FALCON-26 Pipeline Monitor addition. Contributed by Suhas Vasu


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5c930440
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5c930440
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5c930440

Branch: refs/heads/master
Commit: 5c930440668af663c228a987e666bcbb7423e71e
Parents: d1642be
Author: Shwetha GS <shwethags@gmail.com>
Authored: Tue Jan 28 12:27:00 2014 +0530
Committer: Shwetha GS <shwethags@gmail.com>
Committed: Tue Jan 28 12:27:00 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../java/org/apache/falcon/cli/FalconCLI.java   | 14 ++-
 .../org/apache/falcon/client/FalconClient.java  | 44 ++++++++-
 .../falcon/resource/InstancesSummaryResult.java | 99 ++++++++++++++++++++
 .../workflow/engine/AbstractWorkflowEngine.java |  3 +
 docs/src/site/twiki/FalconCLI.twiki             | 13 +++
 docs/src/site/twiki/FalconDocumentation.twiki   |  4 +-
 docs/src/site/twiki/restapi/InstanceLogs.twiki  |  3 +-
 .../src/site/twiki/restapi/InstanceStatus.twiki |  1 +
 .../site/twiki/restapi/InstanceSummary.twiki    | 45 +++++++++
 .../workflow/engine/OozieWorkflowEngine.java    | 96 ++++++++++++++++++-
 .../resource/AbstractInstanceManager.java       | 20 ++++
 .../resource/proxy/InstanceManagerProxy.java    | 82 ++++++++++++++++
 .../apache/falcon/resource/InstanceManager.java | 12 +++
 .../java/org/apache/falcon/cli/FalconCLIIT.java | 30 ++++++
 15 files changed, 462 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0ea0245..02a5df0 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-26 Pipeline Monitor addition. (Suhas Vasu via Shwetha GS)
+
     FALCON-254 Bootstrap designer module. (Srikanth Sundarrajan via Shwetha GS)
 
     FALCON-238 Support updates at specific time. (Shwetha GS)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 37ccf4f..6cb388e 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -67,6 +67,7 @@ public class FalconCLI {
     public static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule";
     public static final String VALIDATE_OPT = "validate";
     public static final String STATUS_OPT = "status";
+    public static final String SUMMARY_OPT = "summary";
     public static final String DEFINITION_OPT = "definition";
     public static final String DEPENDENCY_OPT = "dependency";
     public static final String LIST_OPT = "list";
@@ -191,6 +192,8 @@ public class FalconCLI {
             result = client.getRunningInstances(type, entity, colo);
         } else if (optionsList.contains(STATUS_OPT)) {
             result = client.getStatusOfInstances(type, entity, start, end, colo);
+        } else if (optionsList.contains(SUMMARY_OPT)) {
+            result = client.getSummaryOfInstances(type, entity, start, end, colo);
         } else if (optionsList.contains(KILL_OPT)) {
             result = client.killInstances(type, entity, start, end, colo, clusters, sourceClusters);
         } else if (optionsList.contains(SUSPEND_OPT)) {
@@ -235,7 +238,8 @@ public class FalconCLI {
         if (optionsList.contains(CLUSTERS_OPT)) {
             if (optionsList.contains(RUNNING_OPT)
                     || optionsList.contains(LOG_OPT)
-                    || optionsList.contains(STATUS_OPT)) {
+                    || optionsList.contains(STATUS_OPT)
+                    || optionsList.contains(SUMMARY_OPT)) {
                 throw new FalconCLIException("Invalid argument: clusters");
             }
         }
@@ -243,7 +247,8 @@ public class FalconCLI {
         if (optionsList.contains(SOURCECLUSTER_OPT)) {
             if (optionsList.contains(RUNNING_OPT)
                     || optionsList.contains(LOG_OPT)
-                    || optionsList.contains(STATUS_OPT) || !type.equals("feed")) {
+                    || optionsList.contains(STATUS_OPT)
+                    || optionsList.contains(SUMMARY_OPT) || !type.equals("feed")) {
                 throw new FalconCLIException("Invalid argument: sourceClusters");
             }
         }
@@ -460,6 +465,10 @@ public class FalconCLI {
                 STATUS_OPT,
                 false,
                 "Gets status of process instances for a given process in the range start
time and optional end time");
+        Option summary = new Option(
+                SUMMARY_OPT,
+                false,
+                "Gets summary of instances for a given process in the range start time and
optional end time");
         Option kill = new Option(
                 KILL_OPT,
                 false,
@@ -494,6 +503,7 @@ public class FalconCLI {
         OptionGroup group = new OptionGroup();
         group.addOption(running);
         group.addOption(status);
+        group.addOption(summary);
         group.addOption(kill);
         group.addOption(resume);
         group.addOption(suspend);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index e64bfe8..4a7207f 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -27,6 +27,7 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.InstancesSummaryResult;
 
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.MediaType;
@@ -40,6 +41,7 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -137,7 +139,8 @@ public class FalconClient {
         SUSPEND("api/instance/suspend/", HttpMethod.POST, MediaType.APPLICATION_JSON),
         RESUME("api/instance/resume/", HttpMethod.POST, MediaType.APPLICATION_JSON),
         RERUN("api/instance/rerun/", HttpMethod.POST, MediaType.APPLICATION_JSON),
-        LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON);
+        LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON),
+        SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON);
 
         private String path;
         private String method;
@@ -279,6 +282,14 @@ public class FalconClient {
                 null, null, colo);
     }
 
+    public String getSummaryOfInstances(String type, String entity,
+                                       String start, String end,
+                                       String colo) throws FalconCLIException {
+
+        return sendInstanceRequest(Instances.SUMMARY, type, entity, start, end,
+                null, null, colo);
+    }
+
     public String killInstances(String type, String entity, String start,
                                 String end, String colo, String clusters,
                                 String sourceClusters)
@@ -547,6 +558,8 @@ public class FalconClient {
 
         if (instances.name().equals("LOG")) {
             return parseProcessInstanceResultLogs(clientResponse, runid);
+        } else if (instances.name().equals("SUMMARY")) {
+            return summarizeProcessInstanceResult(clientResponse);
         } else {
             return parseProcessInstanceResult(clientResponse);
         }
@@ -587,6 +600,35 @@ public class FalconClient {
         return clientResponse.getEntity(String.class);
     }
 
+    private String summarizeProcessInstanceResult(ClientResponse clientResponse) {
+        InstancesSummaryResult result = clientResponse
+                .getEntity(InstancesSummaryResult.class);
+        StringBuilder sb = new StringBuilder();
+        String toAppend;
+
+        sb.append("Consolidated Status: ").append(result.getStatus()).append("\n");
+        sb.append("\nInstances Summary:\n");
+
+        if (result.getInstancesSummary() != null) {
+            for (InstancesSummaryResult.InstanceSummary summary : result.getInstancesSummary())
{
+                toAppend = summary.getCluster() != null ? summary.getCluster() : "-";
+                sb.append("Cluster: ").append(toAppend).append("\n");
+
+                sb.append("Status\t\tCount\n");
+                sb.append("-------------------------\n");
+
+                for (Map.Entry<String, Long> entry : summary.getSummaryMap().entrySet())
{
+                    sb.append(entry.getKey()).append("\t\t").append(entry.getValue()).append("\n");
+                }
+            }
+        }
+
+        sb.append("\nAdditional Information:\n");
+        sb.append("Response: ").append(result.getMessage());
+        sb.append("Request Id: ").append(result.getRequestId());
+        return sb.toString();
+    }
+
     private String parseProcessInstanceResult(ClientResponse clientResponse) {
         InstancesResult result = clientResponse
                 .getEntity(InstancesResult.class);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
new file mode 100644
index 0000000..0758c8b
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesSummaryResult.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Map;
+
+/**
+ * Pojo for JAXB marshalling / unmarshalling.
+ */
+
+//SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+@XmlRootElement
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class InstancesSummaryResult extends APIResult {
+
+    @XmlElement
+    private InstanceSummary[] instancesSummary;
+
+    private InstancesSummaryResult() { // for jaxb
+        super();
+    }
+
+    public InstancesSummaryResult(String message, InstanceSummary[] instancesSummary) {
+        this(Status.SUCCEEDED, message, instancesSummary);
+    }
+
+    public InstancesSummaryResult(Status status, String message,
+                                  InstanceSummary[] instancesSummary) {
+        super(status, message);
+        this.instancesSummary = instancesSummary;
+    }
+
+    public InstancesSummaryResult(Status status, String message) {
+        super(status, message);
+    }
+
+    public InstanceSummary[] getInstancesSummary() {
+        return instancesSummary;
+    }
+
+    public void setInstancesSummary(InstanceSummary[] instancesSummary) {
+        this.instancesSummary = instancesSummary;
+    }
+
+    /**
+     * A single instance object inside instance result.
+     */
+    @XmlRootElement(name = "instance-summary")
+    public static class InstanceSummary {
+
+        @XmlElement
+        public String cluster;
+        @XmlElementWrapper(name="map")
+        public Map<String, Long> summaryMap;
+
+        public InstanceSummary() {
+        }
+
+        public InstanceSummary(String cluster, Map<String, Long> summaryMap) {
+            this.cluster = cluster;
+            this.summaryMap = summaryMap;
+        }
+
+        public Map<String, Long> getSummaryMap() {
+            return summaryMap;
+        }
+
+        public String getCluster() {
+            return cluster;
+        }
+
+        @Override
+        public String toString() {
+            return "cluster: " + (this.cluster == null ? "" : this.cluster)
+                    + "summaryMap: " + summaryMap.toString();
+        }
+    }
+
+}
+//RESUME CHECKSTYLE CHECK VisibilityModifierCheck

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 043d622..025e1b7 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -22,6 +22,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.InstancesSummaryResult;
 
 import java.util.Date;
 import java.util.HashSet;
@@ -77,6 +78,8 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract InstancesResult getStatus(Entity entity, Date start, Date end) throws
FalconException;
 
+    public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end)
throws FalconException;
+
     public abstract Date update(Entity oldEntity, Entity newEntity, String cluster, Date
end) throws FalconException;
 
     public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index 4163a38..1d451be 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -137,6 +137,19 @@ Example : Suppose a process has 3 instance, one has succeeded,one is
in running
 Usage:
 $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>>
-status -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'"
 
+---+++Summary
+
+Summary option via CLI can be used to get the consolidated status of the instances between
the specified time period.
+Each status along with the corresponding instance count are listed for each of the applicable
colos.
+The unscheduled instances between the specified time period are included as UNSCHEDULED in
the output to provide more clarity.
+
+Example : Suppose a process has 3 instance, one has succeeded,one is in running state and
other one is waiting, the expected output is:
+
+{"status":"SUCCEEDED","message":"getSummary is successful", "cluster": <<name>>
[{"SUCCEEDED":"1"}, {"WAITING":"1"}, {"RUNNING":"1"}]}
+
+Usage:
+$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>>
-summary -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'"
+
 ---+++Running
 
 Running option provides all the running instances of the mentioned process.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/docs/src/site/twiki/FalconDocumentation.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconDocumentation.twiki b/docs/src/site/twiki/FalconDocumentation.twiki
index 71affa0..1c96ecf 100644
--- a/docs/src/site/twiki/FalconDocumentation.twiki
+++ b/docs/src/site/twiki/FalconDocumentation.twiki
@@ -213,7 +213,9 @@ Parameters -start and -end are used to mention the date range within which
you w
    * 4. *suspend*: -suspend is used to suspend a instance or instances for the given process.
This option pauses the parent workflow at the state, which it was in at the time of execution
of this command. This command is similar to SUSPEND process command in functionality only
difference being, SUSPEND process suspends all the instance whereas suspend instance suspend
only that instance or instances in the range. 
 
    * 5.	*resume*: -resume option is used to resume any instance that is in suspended state.
(Note: due to a bug in oozie �resume option in some cases may not actually resume the suspended
instance/ instances)
-   * 6. *kill*: -kill option can be used to kill an instance or multiple instances 
+   * 6. *kill*: -kill option can be used to kill an instance or multiple instances
+
+   * 7. *summary*: -summary option via CLI can be used to get the consolidated status of
the instances between the specified time period. Each status along with the corresponding
instance count are listed for each of the applicable colos.
 
 
 In all the cases where your request is syntactically correct but logically not, the instance
/ instances are returned with the same status as earlier. Example: trying to resume a KILLED
/ SUCCEEDED instance will return the instance with KILLED / SUCCEEDED, without actually performing
any operation. This is so because only an instance in SUSPENDED state can be resumed. Same
thing is valid for rerun a SUSPENDED or RUNNING options etc. 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/docs/src/site/twiki/restapi/InstanceLogs.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceLogs.twiki b/docs/src/site/twiki/restapi/InstanceLogs.twiki
index ad8d6f5..f84b828 100644
--- a/docs/src/site/twiki/restapi/InstanceLogs.twiki
+++ b/docs/src/site/twiki/restapi/InstanceLogs.twiki
@@ -10,7 +10,8 @@ Get log of a specific instance of an entity.
 ---++ Parameters
    * :entity-type can either be a feed or a process.
    * :entity-name is name of the entity.
-   * start is the start time of the instace that you want to refer to
+   * start is the start time of the instance that you want to refer to
+   * end <optional param> is the end time of the instance that you want to refer to
 
 ---++ Results
 Log of specified instance.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/docs/src/site/twiki/restapi/InstanceStatus.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceStatus.twiki b/docs/src/site/twiki/restapi/InstanceStatus.twiki
index 5f4a353..eddc2c8 100644
--- a/docs/src/site/twiki/restapi/InstanceStatus.twiki
+++ b/docs/src/site/twiki/restapi/InstanceStatus.twiki
@@ -11,6 +11,7 @@ Get status of a specific instance of an entity.
    * :entity-type can either be a feed or a process.
    * :entity-name is name of the entity.
    * start is the start time of the instance that you want to refer to
+   * end <optional param> is the end time of the instance that you want to refer to
 
 ---++ Results
 Status of the specified instance.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/docs/src/site/twiki/restapi/InstanceSummary.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceSummary.twiki b/docs/src/site/twiki/restapi/InstanceSummary.twiki
new file mode 100644
index 0000000..bd1d2e5
--- /dev/null
+++ b/docs/src/site/twiki/restapi/InstanceSummary.twiki
@@ -0,0 +1,45 @@
+---++  GET /api/instance/summary/:entity-type/:entity-name
+   * <a href="#Description">Description</a>
+   * <a href="#Parameters">Parameters</a>
+   * <a href="#Results">Results</a>
+   * <a href="#Examples">Examples</a>
+
+---++ Description
+Get summary of instance/instances of an entity.
+
+---++ Parameters
+   * :entity-type can either be a feed or a process.
+   * :entity-name is name of the entity.
+   * start is the start time of the instance that you want to refer to
+   * end <optional param> is the end time of the instance that you want to refer to
+
+---++ Results
+Summary of the instances over the specified time range
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/summary/process/WordCount?colo=*&start=2014-01-21T13:00Z&end=2014-01-21T16:00Z
+Remote-User: suhas
+</verbatim>
+---+++ Result
+<verbatim>
+{
+    "status":"SUCCEEDED",
+    "message":"default/SUMMARY\n",
+    "requestId":"default/c344567b-da73-44d5-bcd4-bf456524934c\n",
+    "instancesSummary":
+        {
+            "cluster":"local",
+            "map":
+                {
+                    "entry":
+                        {
+                            "key":"SUCCEEDED",
+                            "value":"3"
+                         }
+                }
+            }
+        }
+}
+</verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 5a2a863..7c3a425 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -31,6 +31,8 @@ import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesResult.Instance;
 import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
+import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary;
 import org.apache.falcon.update.UpdateHelper;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.workflow.OozieWorkflowBuilder;
@@ -461,8 +463,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return doJobAction(JobAction.STATUS, entity, start, end, null);
     }
 
+    @Override
+    public InstancesSummaryResult getSummary(Entity entity, Date start, Date end)
+        throws FalconException {
+
+        return doSummaryJobAction(entity, start, end, null);
+    }
+
     private static enum JobAction {
-        KILL, SUSPEND, RESUME, RERUN, STATUS
+        KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY
     }
 
     private WorkflowJob getWorkflowInfo(String cluster, String wfId)
@@ -532,6 +541,91 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return instancesResult;
     }
 
+    private InstancesSummaryResult doSummaryJobAction(Entity entity,
+                                        Date start, Date end, Properties props) throws FalconException
{
+
+        Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
+        List<InstanceSummary> instances = new ArrayList<InstanceSummary>();
+        List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
+
+        for (Map.Entry<String, List<BundleJob>> entry : bundlesMap.entrySet())
{
+            Map<String, Long> instancesSummary = new HashMap<String, Long>();
+            String cluster = entry.getKey();
+            if (clusterList.size() != 0 && !clusterList.contains(cluster)) {
+                continue;
+            }
+
+            List<BundleJob> bundles = entry.getValue();
+            OozieClient client = OozieClientFactory.get(cluster);
+            List<CoordinatorJob> applicableCoords = getApplicableCoords(entity, client,
start, end, bundles);
+            long unscheduledInstances = 0;
+            boolean isLastCoord = false;
+
+            for (int i = 0; i < applicableCoords.size(); i++) {
+                CoordinatorJob coord = applicableCoords.get(i);
+                Frequency freq = createFrequency(String.valueOf(coord.getFrequency()), coord.getTimeUnit());
+                TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone());
+                Date iterStart = EntityUtil.getNextStartTime(coord.getStartTime(), freq,
tz, start);
+                Date iterEnd = (coord.getLastActionTime().before(end) ? coord.getLastActionTime()
: end);
+
+                if (i == (applicableCoords.size() - 1)) {
+                    isLastCoord = true;
+                }
+
+                int startActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(),
freq, tz, iterStart);
+                int lastMaterializedActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(),
+                        freq, tz, iterEnd);
+                int endActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(),
freq, tz, end);
+
+                if (lastMaterializedActionNumber < startActionNumber) {
+                    continue;
+                }
+
+                if (isLastCoord && endActionNumber != lastMaterializedActionNumber)
{
+                    unscheduledInstances = endActionNumber - lastMaterializedActionNumber;
+                }
+
+                CoordinatorJob coordJob;
+                try {
+                    coordJob = client.getCoordJobInfo(coord.getId(), null, startActionNumber,
+                            (lastMaterializedActionNumber - startActionNumber));
+                } catch (OozieClientException e) {
+                    LOG.debug("Unable to get details for coordinator " + coord.getId() +
" " + e.getMessage());
+                    throw new FalconException(e);
+                }
+
+                if (coordJob != null) {
+                    updateInstanceSummary(coordJob, instancesSummary);
+                }
+            }
+
+            if (unscheduledInstances > 0) {
+                instancesSummary.put("UNSCHEDULED", unscheduledInstances);
+            }
+
+            InstanceSummary summary= new InstanceSummary(cluster, instancesSummary);
+            instances.add(summary);
+        }
+
+        InstancesSummaryResult instancesSummaryResult = new InstancesSummaryResult(APIResult.Status.SUCCEEDED,
+                JobAction.SUMMARY.name());
+        instancesSummaryResult.setInstancesSummary(instances.toArray(new InstanceSummary[instances.size()]));
+        return instancesSummaryResult;
+    }
+
+    private void updateInstanceSummary(CoordinatorJob coordJob, Map<String, Long> instancesSummary)
{
+        List<CoordinatorAction> actions = coordJob.getActions();
+
+        for (CoordinatorAction coordAction :  actions) {
+            if (instancesSummary.containsKey(coordAction.getStatus().name())) {
+                instancesSummary.put(coordAction.getStatus().name(),
+                        instancesSummary.get(coordAction.getStatus().name()) + 1L);
+            } else {
+                instancesSummary.put(coordAction.getStatus().name(), 1L);
+            }
+        }
+    }
+
     private String performAction(String cluster, JobAction action, CoordinatorAction coordinatorAction,
                                  Properties props) throws FalconException {
         WorkflowJob jobInfo = null;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index d94e8c5..4eef9ed 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -96,6 +96,26 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
         }
     }
 
+    public InstancesSummaryResult getSummary(String type, String entity, String startStr,
String endStr,
+                                     String colo) {
+        checkColo(colo);
+        checkType(type);
+        try {
+            validateParams(type, entity, startStr, endStr);
+
+            Date start = EntityUtil.parseDateUTC(startStr);
+            Date end = getEndDate(start, endStr);
+            Entity entityObject = EntityUtil.getEntity(type, entity);
+
+            AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+            return wfEngine.getSummary(entityObject, start, end);
+        } catch (Throwable e) {
+            LOG.error("Failed to get instances status", e);
+            throw FalconWebException
+                    .newInstanceException(e, Response.Status.BAD_REQUEST);
+        }
+    }
+
     public InstancesResult getLogs(String type, String entity, String startStr,
                                    String endStr, String colo, String runId) {
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 643f98b..407f39a 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -27,6 +27,8 @@ import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.AbstractInstanceManager;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesResult.Instance;
+import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary;
 import org.apache.falcon.resource.channel.Channel;
 import org.apache.falcon.resource.channel.ChannelFactory;
 
@@ -105,6 +107,25 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
     }
 
     @GET
+    @Path("summary/{type}/{entity}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Monitored(event = "instance-summary")
+    @Override
+    public InstancesSummaryResult getSummary(@Dimension("entityType") @PathParam("type")
final String type,
+                                             @Dimension("entityName") @PathParam("entity")
final String entity,
+                                             @Dimension("start-time") @QueryParam("start")
final String startStr,
+                                             @Dimension("end-time") @QueryParam("end") final
String endStr,
+                                             @Dimension("colo") @QueryParam("colo") final
String colo) {
+        return new InstanceSummaryProxy() {
+            @Override
+            protected InstancesSummaryResult doExecute(String colo) throws FalconException
{
+                return getInstanceManager(colo).invoke("getSummary",
+                        type, entity, startStr, endStr, colo);
+            }
+        }.execute(colo, type, entity);
+    }
+
+    @GET
     @Path("logs/{type}/{entity}")
     @Produces(MediaType.APPLICATION_JSON)
     @Monitored(event = "instance-logs")
@@ -239,6 +260,33 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
         protected abstract InstancesResult doExecute(String colo) throws FalconException;
     }
 
+    private abstract class InstanceSummaryProxy {
+
+        public InstancesSummaryResult execute(String coloExpr, String type, String name)
{
+            Set<String> colos = getColosFromExpression(coloExpr, type, name);
+
+            Map<String, InstancesSummaryResult> results = new HashMap<String, InstancesSummaryResult>();
+            for (String colo : colos) {
+                try {
+                    InstancesSummaryResult resultHolder = doExecute(colo);
+                    results.put(colo, resultHolder);
+                } catch (FalconException e) {
+                    results.put(colo, new InstancesSummaryResult(APIResult.Status.FAILED,
+                            e.getClass().getName() + "::" + e.getMessage(),
+                            new InstancesSummaryResult.InstanceSummary[0]));
+                }
+            }
+            InstancesSummaryResult finalResult = consolidateInstanceSummaryResult(results);
+            if (finalResult.getStatus() != APIResult.Status.SUCCEEDED) {
+                throw FalconWebException.newException(finalResult, Response.Status.BAD_REQUEST);
+            } else {
+                return finalResult;
+            }
+        }
+
+        protected abstract InstancesSummaryResult doExecute(String colo) throws FalconException;
+    }
+
     private InstancesResult consolidateInstanceResult(Map<String, InstancesResult>
results) {
         if (results == null || results.isEmpty()) {
             return null;
@@ -271,4 +319,38 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
         result.setRequestId(requestIds.toString());
         return result;
     }
+
+    private InstancesSummaryResult consolidateInstanceSummaryResult(Map<String, InstancesSummaryResult>
results) {
+        if (results == null || results.isEmpty()) {
+            return null;
+        }
+
+        StringBuilder message = new StringBuilder();
+        StringBuilder requestIds = new StringBuilder();
+        List<InstanceSummary> instances = new ArrayList<InstanceSummary>();
+        int statusCount = 0;
+        for (Map.Entry<String, InstancesSummaryResult> entry : results.entrySet())
{
+            String colo = entry.getKey();
+            InstancesSummaryResult result = results.get(colo);
+            message.append(colo).append('/').append(result.getMessage()).append('\n');
+            requestIds.append(colo).append('/').append(result.getRequestId()).append('\n');
+            statusCount += result.getStatus().ordinal();
+
+            if (result.getInstancesSummary() == null) {
+                continue;
+            }
+
+            for (InstanceSummary instance : result.getInstancesSummary()) {
+                instance.summaryMap = instance.getSummaryMap();
+                instances.add(instance);
+            }
+        }
+        InstanceSummary[] arrInstances = new InstanceSummary[instances.size()];
+        APIResult.Status status = (statusCount == 0) ? APIResult.Status.SUCCEEDED
+                : ((statusCount == results.size() * 2) ? APIResult.Status.FAILED : APIResult.Status.PARTIAL);
+        InstancesSummaryResult result = new InstancesSummaryResult(status, message.toString(),
+                instances.toArray(arrInstances));
+        result.setRequestId(requestIds.toString());
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index 104dfef..fd3cd51 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -57,6 +57,18 @@ public class InstanceManager extends AbstractInstanceManager {
     }
 
     @GET
+    @Path("summary/{type}/{entity}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Monitored(event = "instance-summary")
+    public InstancesSummaryResult getSummary(@Dimension("type") @PathParam("type") String
type,
+                                     @Dimension("entity") @PathParam("entity") String entity,
+                                     @Dimension("start-time") @QueryParam("start") String
startStr,
+                                     @Dimension("end-time") @QueryParam("end") String endStr,
+                                     @Dimension("colo") @QueryParam("colo") String colo)
{
+        return super.getSummary(type, entity, startStr, endStr, colo);
+    }
+
+    @GET
     @Path("logs/{type}/{entity}")
     @Produces(MediaType.APPLICATION_JSON)
     @Monitored(event = "instance-logs")

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5c930440/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 0767a76..72369c0 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -366,6 +366,36 @@ public class FalconCLIIT {
                         + " -start " + START_INSTANCE));
     }
 
+    public void testInstanceRunningAndSummaryCommands() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        submitTestFiles(context, overlay);
+
+        Assert.assertEquals(0,
+                executeWithURL("entity -schedule -type process -name "
+                        + overlay.get("processName")));
+
+        Assert.assertEquals(0,
+                executeWithURL("entity -schedule -type feed -name "
+                        + overlay.get("outputFeedName")));
+        context.waitForProcessWFtoStart();
+
+        Assert.assertEquals(0,
+                executeWithURL("instance -status -type feed -name "
+                        + overlay.get("outputFeedName")
+                        + " -start " + START_INSTANCE));
+
+        Assert.assertEquals(0,
+                executeWithURL("instance -running -type process -name "
+                        + overlay.get("processName")));
+
+        Assert.assertEquals(0,
+                executeWithURL("instance -summary -type process -name "
+                        + overlay.get("processName")
+                        + " -start " + START_INSTANCE));
+    }
+
+
     public void testInstanceSuspendAndResume() throws Exception {
         TestContext context = new TestContext();
         Map<String, String> overlay = context.getUniqueOverlay();


Mime
View raw message