falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject [1/2] falcon git commit: FALCON-1039 Add instance dependency API in falcon. Contributed by Ajay Yadava
Date Thu, 04 Jun 2015 06:14:03 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 42f175a12 -> 9fd86b786


http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 678235e..a721666 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -553,9 +553,7 @@ public abstract class AbstractEntityManager {
 
         try {
             Entity entityObj = EntityUtil.getEntity(type, entityName);
-            Set<Entity> dependents = EntityGraph.get().getDependents(entityObj);
-            Entity[] dependentEntities = dependents.toArray(new Entity[dependents.size()]);
-            return new EntityList(dependentEntities, entityObj);
+            return EntityUtil.getEntityDependencies(entityObj);
         } catch (Exception e) {
             LOG.error("Unable to get dependencies for entityName {} ({})", entityName, type,
e);
             throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 42907c8..72f9fe4 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -19,16 +19,24 @@
 package org.apache.falcon.resource;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.*;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconWebException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.Pair;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.logging.LogProvider;
 import org.apache.falcon.resource.InstancesResult.Instance;
+import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +45,17 @@ import javax.servlet.ServletInputStream;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 
 /**
  * A base class for managing Entity's Instance operations.
@@ -160,6 +178,63 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager
{
         }
     }
 
+
+    public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName,
+                                                  String instanceTimeString, String colo)
{
+        checkColo(colo);
+        EntityType type = checkType(entityType);
+        Set<SchedulableEntityInstance> result = new HashSet<>();
+
+        try {
+            Date instanceTime = EntityUtil.parseDateUTC(instanceTimeString);
+            for (String clusterName : DeploymentUtil.getCurrentClusters()) {
+                Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
+                switch (type) {
+
+                case PROCESS:
+                    Process process = EntityUtil.getEntity(EntityType.PROCESS, entityName);
+                    org.apache.falcon.entity.v0.process.Cluster pCluster = ProcessHelper.getCluster(process,
+                            clusterName);
+                    if (pCluster != null) {
+                        Set<SchedulableEntityInstance> inputFeeds = ProcessHelper.getInputFeedInstances(process,
+                                instanceTime, cluster, true);
+                        Set<SchedulableEntityInstance> outputFeeds = ProcessHelper.getOutputFeedInstances(process,
+                                instanceTime, cluster);
+                        result.addAll(inputFeeds);
+                        result.addAll(outputFeeds);
+                    }
+                    break;
+
+                case FEED:
+                    Feed feed = EntityUtil.getEntity(EntityType.FEED, entityName);
+                    org.apache.falcon.entity.v0.feed.Cluster fCluster = FeedHelper.getCluster(feed,
clusterName);
+                    if (fCluster != null) {
+                        Set<SchedulableEntityInstance> consumers = FeedHelper.getConsumerInstances(feed,
instanceTime,
+                                cluster);
+                        SchedulableEntityInstance producer = FeedHelper.getProducerInstance(feed,
instanceTime,
+                                cluster);
+                        result.addAll(consumers);
+                        if (producer != null) {
+                            result.add(producer);
+                        }
+                    }
+                    break;
+
+                default:
+                    throw FalconWebException.newInstanceException("Instance dependency isn't
supported for type: "
+                                + entityType, Response.Status.BAD_REQUEST);
+                }
+            }
+
+        } catch (Throwable throwable) {
+            throw FalconWebException.newInstanceException(throwable, Response.Status.BAD_REQUEST);
+        }
+
+        InstanceDependencyResult res = new InstanceDependencyResult(APIResult.Status.SUCCEEDED,
"Success!");
+        res.setDependencies(result.toArray(new SchedulableEntityInstance[0]));
+        return res;
+    }
+
     public InstancesSummaryResult getSummary(String type, String entity, String startStr,
String endStr,
                                              String colo, List<LifeCycle> lifeCycles)
{
         checkColo(colo);

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 68219fb..1a8396c 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -27,18 +27,28 @@ import org.apache.falcon.monitors.Monitored;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.AbstractInstanceManager;
 import org.apache.falcon.resource.FeedInstanceResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
 import org.apache.falcon.resource.channel.Channel;
 import org.apache.falcon.resource.channel.ChannelFactory;
 
 import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.*;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * A proxy implementation of the entity instance operations.
@@ -338,6 +348,29 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
             }
         }.execute(colo, type, entity);
     }
+
+
+    @GET
+    @Path("dependencies/{type}/{entity}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Monitored(event = "instance-dependency")
+    public InstanceDependencyResult instanceDependencies(
+            @Dimension("type") @PathParam("type") final String entityType,
+            @Dimension("entityName") @PathParam("entity") final String entityName,
+            @Dimension("instanceTime") @QueryParam("instanceTime") final String instanceTimeStr,
+            @Dimension("colo") @QueryParam("colo") String colo) {
+
+        return new InstanceProxy<InstanceDependencyResult>(InstanceDependencyResult.class)
{
+
+            @Override
+            protected InstanceDependencyResult doExecute(String colo) throws FalconException
{
+                return getInstanceManager(colo).invoke("instanceDependencies",
+                        entityType, entityName, instanceTimeStr, colo);
+            }
+
+        }.execute(colo, entityType, entityName);
+    }
+
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     private abstract class InstanceProxy<T extends APIResult> {

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index dc533a2..c2ac5b2 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -23,7 +23,13 @@ import org.apache.falcon.monitors.Dimension;
 import org.apache.falcon.monitors.Monitored;
 
 import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.*;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import java.util.List;
@@ -230,4 +236,16 @@ public class InstanceManager extends AbstractInstanceManager {
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
+
+    @GET
+    @Path("dependencies/{type}/{entity}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Monitored(event = "instance-dependency")
+    public InstanceDependencyResult instanceDependencies(
+            @Dimension("type") @PathParam("type") String entityType,
+            @Dimension("entityName") @PathParam("entity") String entityName,
+            @Dimension("instanceTime") @QueryParam("instanceTime") String instanceTimeStr,
+            @Dimension("colo") @QueryParam("colo") String colo) {
+        return super.getInstanceDependencies(entityType, entityName, instanceTimeStr, colo);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index fa04add..ed6f44e 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -23,7 +23,14 @@ import org.apache.falcon.monitors.Dimension;
 import org.apache.falcon.monitors.Monitored;
 
 import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 797b595..90acb59 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -287,6 +287,10 @@ public class FalconCLIIT {
         Assert.assertEquals(executeWithURL("entity -schedule -type feed -name " + overlay.get("outputFeedName")),
0);
         OozieTestUtils.waitForProcessWFtoStart(context);
 
+        //Test the dependency command
+        Assert.assertEquals(executeWithURL("instance -dependency -type feed -name " + overlay.get("inputFeedName")
+                + " -instanceTime 2010-01-01T00:00Z"), 0);
+
         Assert.assertEquals(executeWithURL("instance -status -type feed -name "
                 + overlay.get("outputFeedName")
                 + " -start " + START_INSTANCE), 0);


Mime
View raw message