Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB20818B61 for ; Tue, 19 Jan 2016 17:14:41 +0000 (UTC) Received: (qmail 85668 invoked by uid 500); 19 Jan 2016 17:14:36 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 85622 invoked by uid 500); 19 Jan 2016 17:14:36 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 85544 invoked by uid 99); 19 Jan 2016 17:14:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jan 2016 17:14:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 70380DFBAF; Tue, 19 Jan 2016 17:14:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pallavi@apache.org To: commits@falcon.apache.org Date: Tue, 19 Jan 2016 17:14:38 -0000 Message-Id: In-Reply-To: <2bc2a04761514f2f9903ce850ab272eb@git.apache.org> References: <2bc2a04761514f2f9903ce850ab272eb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] falcon git commit: FALCON-1742 Implement instance summary api for native scheduler (By Pallavi Rao) FALCON-1742 Implement instance summary api for native scheduler (By Pallavi Rao) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8a739e1f Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8a739e1f Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8a739e1f Branch: refs/heads/master Commit: 8a739e1f631a9a5c7805c5c9f0e1b0521b6c3a06 Parents: 5fb3a7a Author: Pallavi Rao Authored: Tue Jan 19 19:39:40 2016 +0530 Committer: Pallavi Rao Committed: Tue Jan 19 19:39:40 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../falcon/resource/EntitySummaryResult.java | 2 +- .../falcon/state/store/InMemoryStateStore.java | 21 ++++++++++++ .../falcon/state/store/InstanceStateStore.java | 13 ++++++++ .../falcon/state/store/jdbc/BeanMapperUtil.java | 20 +++++++++++ .../falcon/state/store/jdbc/InstanceBean.java | 3 +- .../falcon/state/store/jdbc/JDBCStateStore.java | 17 ++++++++++ .../workflow/engine/FalconWorkflowEngine.java | 23 ++++++++++++- .../state/service/store/TestJDBCStateStore.java | 35 ++++++++++++++++++++ .../apache/falcon/unit/FalconUnitClient.java | 6 ++++ .../InstanceSchedulerManagerJerseyIT.java | 24 ++++++++++++++ 11 files changed, 163 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 255706d..f616298 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -23,6 +23,8 @@ Proposed Release Version: 0.9 INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1742 Implement instance summary api for native scheduler (Pallavi Rao) + FALCON-1677 Support re-tries for timed-out instances (Narayan Periwal via Pallavi Rao) FALCON-1643 Add CLI option to display captured replication metrics(Peeyush Bishnoi via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java index 4a885ec..3ebfe26 100644 --- a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java +++ b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java @@ -35,7 +35,7 @@ public class EntitySummaryResult extends APIResult { * Workflow status as being set in result object. */ public static enum WorkflowStatus { - WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR + WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, READY } @XmlElement http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java index c4ced46..69f1e48 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java @@ -218,6 +218,27 @@ public final class InMemoryStateStore extends AbstractStateStore { } @Override + public Map getExecutionInstanceSummary(Entity entity, String cluster, + DateTime start, DateTime end) throws StateStoreException { + Map summary = new HashMap<>(); + for (InstanceState state : getAllExecutionInstances(entity, cluster)) { + ExecutionInstance instance = state.getInstance(); + DateTime instanceTime = instance.getInstanceTime(); + // Start date inclusive and end date exclusive. + // If start date and end date are equal no instances will be added. + if ((instanceTime.isEqual(start) || instanceTime.isAfter(start)) + && instanceTime.isBefore(end)) { + if (summary.containsKey(state.getCurrentState())) { + summary.put(state.getCurrentState(), summary.get(state.getCurrentState()) + 1L); + } else { + summary.put(state.getCurrentState(), 1L); + } + } + } + return summary; + } + + @Override public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException { EntityClusterID id = new EntityClusterID(entity, cluster); if (!entityStates.containsKey(id.getEntityID().getKey())) { http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java index 8ce8fa0..b7269f8 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.state.store; +import java.util.Map; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.state.EntityClusterID; @@ -101,6 +102,18 @@ public interface InstanceStateStore { */ Collection getExecutionInstances(EntityClusterID entityClusterID, Collection states) throws StateStoreException; + + /** + * @param entity + * @param cluster + * @param states + * @param start + * @param end + * @return - A map of state and the no. of instances in that state. + * @throws StateStoreException + */ + Map getExecutionInstanceSummary(Entity entity, String cluster, + DateTime start, DateTime end) throws StateStoreException; /** * @param entity * @param cluster http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java index 3def14a..194819e 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java @@ -17,6 +17,8 @@ */ package org.apache.falcon.state.store.jdbc; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; @@ -301,4 +303,22 @@ public final class BeanMapperUtil { IOUtils.closeQuietly(out); } } + + /** + * @param summary + * @return A map of state and count given the JQL result. + */ + public static Map getInstanceStateSummary(Collection summary) { + Map stateSummary = new HashMap<>(); + if (summary != null && !summary.isEmpty()) { + for (Object[] projection : summary) { + // Has to have 2 columns (state and count), else Array will be out of bounds. + if (projection.length == 2) { + stateSummary.put(InstanceState.STATE.valueOf((String)projection[0]), + Long.valueOf(((Number)projection[1]).longValue())); + } + } + } + return stateSummary; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java index 7f7b966..e8385b1 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java @@ -50,7 +50,8 @@ import java.sql.Timestamp; @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"), @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"), @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"), - @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a") + @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"), + @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState") }) //RESUME CHECKSTYLE CHECK LineLengthCheck @Table(name = "INSTANCES") http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java index 2eafbce..1c07286 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.state.store.jdbc; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; @@ -357,6 +358,22 @@ public final class JDBCStateStore extends AbstractStateStore { } @Override + public Map getExecutionInstanceSummary(Entity entity, String cluster, + DateTime start, DateTime end) throws StateStoreException { + String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey(); + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE"); + q.setParameter("entityId", entityKey); + q.setParameter("cluster", cluster); + q.setParameter("startTime", new Timestamp(start.getMillis())); + q.setParameter("endTime", new Timestamp(end.getMillis())); + List result = q.getResultList(); + entityManager.close(); + + return BeanMapperUtil.getInstanceStateSummary(result); + } + + @Override public Collection getExecutionInstances(Entity entity, String cluster, Collection states, DateTime start, DateTime end) throws StateStoreException { http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index c9d6b86..7ce2420 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -18,6 +18,7 @@ package org.apache.falcon.workflow.engine; +import java.util.HashMap; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; @@ -398,7 +399,27 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { @Override public InstancesSummaryResult getSummary(Entity entity, Date start, Date end, List lifeCycles) throws FalconException { - throw new FalconException("Not yet Implemented"); + Set clusters = EntityUtil.getClustersDefinedInColos(entity); + List instanceSummaries = new ArrayList<>(); + + // Iterate over entity clusters + for (String cluster : clusters) { + LOG.debug("Retrieving summary of instances for cluster : {}", cluster); + Map summaries = STATE_STORE.getExecutionInstanceSummary(entity, cluster, + new DateTime(start), new DateTime(end)); + Map summaryMap = new HashMap<>(); + // Iterate over the map and convert STATE to String + for (Map.Entry summary : summaries.entrySet()) { + summaryMap.put(summary.getKey().name(), summary.getValue()); + } + instanceSummaries.add(new InstancesSummaryResult.InstanceSummary(cluster, summaryMap)); + } + + InstancesSummaryResult instancesSummaryResult = + new InstancesSummaryResult(APIResult.Status.SUCCEEDED, JobAction.SUMMARY.name()); + instancesSummaryResult.setInstancesSummary(instanceSummaries. + toArray(new InstancesSummaryResult.InstanceSummary[instanceSummaries.size()])); + return instancesSummaryResult; } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java index 2a383cc..d597e27 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java +++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.state.service.store; +import java.util.Map; import org.apache.commons.lang.RandomStringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.cluster.util.EmbeddedCluster; @@ -445,7 +446,41 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase { Assert.assertEquals(instances.size(), 0); } + @Test + public void testGetExecutionSummaryWithRange() throws Exception { + storeEntity(EntityType.CLUSTER, "testCluster"); + storeEntity(EntityType.FEED, "clicksFeed"); + storeEntity(EntityType.FEED, "clicksSummary"); + + long instance1Time = System.currentTimeMillis() - 180000; + long instance2Time = System.currentTimeMillis(); + EntityState entityState = getEntityState(EntityType.PROCESS, "clicksProcess"); + ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + instance1Time, "cluster1", instance1Time); + InstanceState instanceState1 = new InstanceState(processExecutionInstance1); + instanceState1.setCurrentState(InstanceState.STATE.RUNNING); + ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + instance2Time, "cluster1", instance2Time); + InstanceState instanceState2 = new InstanceState(processExecutionInstance2); + instanceState2.setCurrentState(InstanceState.STATE.SUCCEEDED); + + stateStore.putExecutionInstance(instanceState1); + stateStore.putExecutionInstance(instanceState2); + + + Map summary = stateStore.getExecutionInstanceSummary(entityState.getEntity(), + "cluster1", new DateTime(instance1Time), new DateTime(instance1Time + 60000)); + Assert.assertEquals(summary.size(), 1); + Assert.assertEquals(summary.get(InstanceState.STATE.RUNNING).longValue(), 1L); + + summary = stateStore.getExecutionInstanceSummary(entityState.getEntity(), + "cluster1", new DateTime(instance2Time), new DateTime(instance2Time + 60000)); + Assert.assertEquals(summary.size(), 1); + Assert.assertEquals(summary.get(InstanceState.STATE.SUCCEEDED).longValue(), 1L); + } private void initInstanceState(InstanceState instanceState) { instanceState.setCurrentState(InstanceState.STATE.READY); http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index a82cf03..37221f3 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -324,6 +324,12 @@ public class FalconUnitClient extends AbstractFalconClient { String colo, List lifeCycles, String filterBy, String orderBy, String sortOrder, String doAsUser) throws FalconCLIException { + if (StringUtils.isBlank(orderBy)) { + orderBy = DEFAULT_ORDERBY; + } + if (StringUtils.isBlank(sortOrder)) { + sortOrder = DEFAULT_SORTED_ORDER; + } return localInstanceManager.getSummary(type, entity, start, end, colo, lifeCycles, filterBy, orderBy, sortOrder); } http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java index b1c8ce0..b06725f 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java @@ -144,4 +144,28 @@ public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJe Assert.assertEquals(result.getInstances()[0].getInstance(), "2012-04-22T00:00Z"); Assert.assertEquals(result.getInstances()[2].getInstance(), START_INSTANCE); } + + @Test + public void testInstanceSummary() throws Exception { + UnitTestContext context = new UnitTestContext(); + Map overlay = context.getUniqueOverlay(); + + setupProcessExecution(context, overlay, 3); + + String processName = overlay.get(PROCESS_NAME); + String colo = overlay.get(COLO); + + waitForStatus(EntityType.PROCESS.toString(), processName, + START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING); + + InstancesSummaryResult result = falconUnitClient.getSummaryOfInstances(EntityType.PROCESS.toString(), + processName, START_INSTANCE, "2012-04-23T00:00Z", colo, null, null, null, null, null); + + Assert.assertEquals(result.getInstancesSummary().length, 1); + Assert.assertEquals(result.getInstancesSummary()[0].getCluster(), overlay.get(CLUSTER)); + Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().size(), 2); + // Parallelism is 2 + Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("RUNNING").longValue(), 2L); + Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("READY").longValue(), 1L); + } }