falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suh...@apache.org
Subject falcon git commit: FALCON-949 Force update feature. Contributed by pavan kumar kolamuri
Date Mon, 16 Feb 2015 09:00:02 GMT
Repository: falcon
Updated Branches:
  refs/heads/master fd10aa410 -> 280ea92a7


FALCON-949 Force update feature. Contributed by pavan kumar kolamuri


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/280ea92a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/280ea92a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/280ea92a

Branch: refs/heads/master
Commit: 280ea92a7395b07f45d31cd64a1b892de43505e9
Parents: fd10aa4
Author: Suhas Vasu <suhas.v@inmobi.com>
Authored: Mon Feb 16 14:29:43 2015 +0530
Committer: Suhas Vasu <suhas.v@inmobi.com>
Committed: Mon Feb 16 14:29:43 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../java/org/apache/falcon/cli/FalconCLI.java   | 10 +++-
 .../org/apache/falcon/client/FalconClient.java  | 17 +++++-
 .../workflow/engine/AbstractWorkflowEngine.java |  2 +
 docs/src/site/twiki/FalconCLI.twiki             |  6 +++
 docs/src/site/twiki/restapi/EntityTouch.twiki   | 29 +++++++++++
 .../workflow/engine/OozieWorkflowEngine.java    | 15 ++++++
 .../falcon/resource/AbstractEntityManager.java  |  7 ++-
 .../AbstractSchedulableEntityManager.java       | 27 ++++++++++
 .../proxy/SchedulableEntityManagerProxy.java    | 23 +++++++++
 .../resource/SchedulableEntityManager.java      | 11 ++++
 .../java/org/apache/falcon/cli/FalconCLIIT.java |  3 ++
 .../falcon/resource/EntityManagerJerseyIT.java  | 54 ++++++++++++++++++++
 13 files changed, 202 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 349f4d4..8d16073 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-949 Force update feature (pavan kumar kolamuri via Suhas Vasu)
 
   IMPROVEMENTS
    FALCON-263 Adding documentation for params api (Ajay Yadav via Srikanth 

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 3620c3b..ac76a9c 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -84,6 +84,7 @@ public class FalconCLI {
     public static final String DEFINITION_OPT = "definition";
     public static final String DEPENDENCY_OPT = "dependency";
     public static final String LIST_OPT = "list";
+    public static final String TOUCH_OPT = "touch";
 
     public static final String FIELDS_OPT = "fields";
     public static final String FILTER_BY_OPT = "filterBy";
@@ -453,6 +454,10 @@ public class FalconCLI {
                                 entityType, cluster, start, end, fields, filterBy,
                                 filterTags,
                                 orderBy, sortOrder, offset, numResults, numInstances));
+        } else if (optionsList.contains(TOUCH_OPT)) {
+            validateNotEmpty(entityName, ENTITY_NAME_OPT);
+            colo = getColo(colo);
+            result = client.touch(entityType, entityName, colo).getMessage();
         } else if (optionsList.contains(HELP_CMD)) {
             OUT.get().println("Falcon Help");
         } else {
@@ -613,9 +618,11 @@ public class FalconCLI {
         Option dependency = new Option(DEPENDENCY_OPT, false,
                 "Gets the dependencies of entity");
         Option list = new Option(LIST_OPT, false,
-                "List entities registerd for a type");
+                "List entities registered for a type");
         Option entitySummary = new Option(SUMMARY_OPT, false,
                 "Get summary of instances for list of entities");
+        Option touch = new Option(TOUCH_OPT, false,
+                "Force update the entity in workflow engine(even without any changes to entity)");
 
         OptionGroup group = new OptionGroup();
         group.addOption(submit);
@@ -631,6 +638,7 @@ public class FalconCLI {
         group.addOption(dependency);
         group.addOption(list);
         group.addOption(entitySummary);
+        group.addOption(touch);
 
         Option url = new Option(URL_OPTION, true, "Falcon URL");
         Option entityType = new Option(ENTITY_TYPE_OPT, true,

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 5064e46..86397c4 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -190,7 +190,8 @@ public class FalconClient {
         DEFINITION("api/entities/definition/", HttpMethod.GET, MediaType.TEXT_XML),
         LIST("api/entities/list/", HttpMethod.GET, MediaType.TEXT_XML),
         SUMMARY("api/entities/summary", HttpMethod.GET, MediaType.APPLICATION_JSON),
-        DEPENDENCY("api/entities/dependencies/", HttpMethod.GET, MediaType.TEXT_XML);
+        DEPENDENCY("api/entities/dependencies/", HttpMethod.GET, MediaType.TEXT_XML),
+        TOUCH("api/entities/touch", HttpMethod.POST, MediaType.TEXT_XML);
 
         private String path;
         private String method;
@@ -381,6 +382,20 @@ public class FalconClient {
                 orderBy, sortOrder, offset, numResults, numInstances);
     }
 
+    public APIResult touch(String entityType, String entityName, String colo) throws FalconCLIException
{
+        Entities operation = Entities.TOUCH;
+        WebResource resource = service.path(operation.path).path(entityType).path(entityName);
+        if (colo != null) {
+            resource = resource.queryParam("colo", colo);
+        }
+        ClientResponse clientResponse = resource
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                .accept(operation.mimeType).type(MediaType.TEXT_XML)
+                .method(operation.method, ClientResponse.class);
+        checkIfSuccessful(clientResponse);
+        return parseAPIResult(clientResponse);
+    }
+
     public InstancesResult getRunningInstances(String type, String entity, String colo, List<LifeCycle>
lifeCycles,
                                       String filterBy, String orderBy, String sortOrder,
                                       Integer offset, Integer numResults) throws FalconCLIException
{

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index f5b142b..6b10679 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -90,6 +90,8 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract String update(Entity oldEntity, Entity newEntity, String cluster) throws
FalconException;
 
+    public abstract String touch(Entity entity, String cluster) throws FalconException;
+
     public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;
 
     public abstract Properties getWorkflowProperties(String cluster, String jobId) throws
FalconException;

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index d37cf8c..547aa7d 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -83,6 +83,12 @@ $FALCON_HOME/bin/falcon entity  -type [feed|process] -name <<name>>
-update -fil
 Example:
 $FALCON_HOME/bin/falcon entity -type process -name HourlyReportsGenerator -update -file /process/definition.xml
 
+---+++Touch
+
+Force Update operation allows an already submitted/scheduled entity to be updated.
+
+Usage:
+$FALCON_HOME/bin/falcon entity  -type [feed|process] -name <<name>> -touch
 
 ---+++Status
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/docs/src/site/twiki/restapi/EntityTouch.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityTouch.twiki b/docs/src/site/twiki/restapi/EntityTouch.twiki
new file mode 100644
index 0000000..ffc7967
--- /dev/null
+++ b/docs/src/site/twiki/restapi/EntityTouch.twiki
@@ -0,0 +1,29 @@
+---++ POST  api/entities/touch/:entity-type/:entity-name
+   * <a href="#Description">Description</a>
+   * <a href="#Parameters">Parameters</a>
+   * <a href="#Results">Results</a>
+   * <a href="#Examples">Examples</a>
+
+---++ Description
+Force updates the entity.
+
+---++ Parameters
+   * :entity-type can be feed or process.
+   * :entity-name is name of the feed or process.
+
+---++ Results
+Result of the validation.
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+POST http://localhost:15000/api/entities/touch/process/SampleProcess
+</verbatim>
+---+++ Result
+<verbatim>
+{
+    "requestId": "touch\/default\/d6aaa328-6836-4818-a212-515bb43d8b86\n\n",
+    "message": "touch\/default\/SampleProcess updated successfully\n\n",
+    "status": "SUCCEEDED"
+}
+</verbatim>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index ce292bd..fd1cdac 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -1069,6 +1069,21 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return result.toString();
     }
 
+    @Override
+    public String touch(Entity entity, String cluster) throws FalconException {
+        BundleJob bundle = findLatestBundle(entity, cluster);
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+        StringBuilder result = new StringBuilder();
+        if (bundle != MISSING) {
+            LOG.info("Updating entity {} for cluster: {}, bundle: {}", entity.toShortString(),
cluster, bundle.getId());
+            String output = updateInternal(entity, entity, clusterEntity, bundle, CurrentUser.getUser());
+            result.append(output).append("\n");
+            LOG.info("Entity update complete: {} for cluster {}, bundle: {}", entity.toShortString(),
cluster,
+                    bundle.getId());
+        }
+        return result.toString();
+    }
+
     private String getUpdateString(Entity entity, Date date, BundleJob oldBundle, BundleJob
newBundle) {
         StringBuilder builder = new StringBuilder();
         builder.append(entity.toShortString()).append("/Effective Time: ").append(SchemaHelper.formatDateUTC(date));

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 0d34ef3..caa9a74 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -121,10 +121,15 @@ public abstract class AbstractEntityManager {
 
     protected Set<String> getColosFromExpression(String coloExpr, String type, String
entity) {
         Set<String> colos;
+        final Set<String> applicableColos = getApplicableColos(type, entity);
         if (coloExpr == null || coloExpr.equals("*") || coloExpr.isEmpty()) {
-            colos = getApplicableColos(type, entity);
+            colos = applicableColos;
         } else {
             colos = new HashSet<String>(Arrays.asList(coloExpr.split(",")));
+            if (!applicableColos.containsAll(colos)) {
+                throw FalconWebException.newException("Given colos not applicable for entity
operation",
+                        Response.Status.BAD_REQUEST);
+            }
         }
         return colos;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index b8d12ee..adfef35 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import java.util.*;
@@ -224,6 +225,32 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
                 entitySummaries.toArray(new EntitySummaryResult.EntitySummary[entitySummaries.size()]));
     }
 
+    /**
+     * Force updates an entity.
+     *
+     * @param type
+     * @param entityName
+     * @return APIResult
+     */
+    public APIResult touch(@Dimension("entityType") @PathParam("type") String type,
+                           @Dimension("entityName") @PathParam("entity") String entityName,
+                           @Dimension("colo") @QueryParam("colo") String colo) {
+        checkColo(colo);
+        StringBuilder result = new StringBuilder();
+        try {
+            Entity entity = EntityUtil.getEntity(type, entityName);
+            Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
+            for (String cluster : clusters) {
+                result.append(getWorkflowEngine().touch(entity, cluster));
+            }
+        } catch (Throwable e) {
+            LOG.error("Touch failed", e);
+            throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
+    }
+
+
     private void validateTypeForEntitySummary(String type) {
         EntityType entityType = EntityType.getEnum(type);
         if (!entityType.isSchedulable()) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 34ff0f7..5f711ee 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -274,6 +274,29 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
         return consolidateResult(results, APIResult.class);
     }
 
+    @POST
+    @Path("touch/{type}/{entity}")
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    @Monitored(event = "touch")
+    @Override
+    public APIResult touch(
+            @Dimension("entityType") @PathParam("type") final String type,
+            @Dimension("entityName") @PathParam("entity") final String entityName,
+            @Dimension("colo") @QueryParam("colo") final String coloExpr) {
+        final Set<String> colosFromExp = getColosFromExpression(coloExpr, type, entityName);
+        return new EntityProxy(type, entityName) {
+            @Override
+            protected Set<String> getColosToApply() {
+                return colosFromExp;
+            }
+
+            @Override
+            protected APIResult doExecute(String colo) throws FalconException {
+                return getEntityManager(colo).invoke("touch", type, entityName, colo);
+            }
+        }.execute();
+    }
+
     @GET
     @Path("status/{type}/{entity}")
     @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index 2ec7f66..a83f0cf 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -151,4 +151,15 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager
{
         return super.validate(request, type);
     }
 
+    @POST
+    @Path("touch/{type}/{entity}")
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Monitored(event = "touch")
+    @Override
+    public APIResult touch(@Dimension("entityType") @PathParam("type") String type,
+                           @Dimension("entityName") @PathParam("entity") String entityName,
+                           @Dimension("colo") @QueryParam("colo") String colo) {
+        return super.touch(type, entityName, colo);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 118003f..7512302 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -119,6 +119,9 @@ public class FalconCLIIT {
 
         Assert.assertEquals(executeWithURL("entity -update -name " + overlay.get("processName")
                 + " -type process -file " + filePath), 0);
+
+        Assert.assertEquals(0,
+                executeWithURL("entity -touch -name " + overlay.get("processName") + " -type
process"));
     }
 
     public void testValidateValidCommands() throws Exception {

http://git-wip-us.apache.org/repos/asf/falcon/blob/280ea92a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index 40f8e04..c6fd420 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -140,6 +140,16 @@ public class EntityManagerJerseyIT {
                 .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
     }
 
+    private ClientResponse touch(TestContext context, Entity entity) {
+        WebResource resource = context.service.path("api/entities/touch/"
+                + entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
+        ClientResponse clientResponse = resource
+                .header("Cookie", context.getAuthenticationToken())
+                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .post(ClientResponse.class);
+        return clientResponse;
+    }
+
     @Test
     public void testUpdateCheckUser() throws Exception {
         TestContext context = newContext();
@@ -343,6 +353,50 @@ public class EntityManagerJerseyIT {
         Assert.assertEquals(bundles.size(), 1);
     }
 
+    @Test
+    public void testTouchEntity() throws Exception {
+        TestContext context = newContext();
+        context.scheduleProcess();
+        OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
+        List<BundleJob> bundles = OozieTestUtils.getBundles(context);
+        Assert.assertEquals(bundles.size(), 1);
+        ProxyOozieClient ozClient = OozieTestUtils.getOozieClient(context.getCluster().getCluster());
+        String bundle = bundles.get(0).getId();
+        String coordId = ozClient.getBundleJobInfo(bundle).getCoordinators().get(0).getId();
+
+        //Update end time of process required for touch
+        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
+        updateEndtime(process);
+        ClientResponse response = update(context, process, null);
+        context.assertSuccessful(response);
+        bundles = OozieTestUtils.getBundles(context);
+        Assert.assertEquals(bundles.size(), 1);
+
+        //Calling force update
+        response = touch(context, process);
+        context.assertSuccessful(response);
+        OozieTestUtils.waitForBundleStart(context, Status.PREP, Status.RUNNING);
+
+        //Assert that touch creates new bundle and old coord is running
+        bundles = OozieTestUtils.getBundles(context);
+        Assert.assertEquals(bundles.size(), 2);
+        CoordinatorJob coord = ozClient.getCoordJobInfo(coordId);
+        Assert.assertTrue(coord.getStatus() == Status.RUNNING || coord.getStatus() == Status.SUCCEEDED);
+
+        //Assert on new bundle/coord
+        String newBundle = null;
+        for (BundleJob myBundle : bundles) {
+            if (!myBundle.getId().equals(bundle)) {
+                newBundle = myBundle.getId();
+                break;
+            }
+        }
+
+        assert newBundle != null;
+        coord = ozClient.getCoordJobInfo(ozClient.getBundleJobInfo(newBundle).getCoordinators().get(0).getId());
+        Assert.assertTrue(coord.getStatus() == Status.RUNNING || coord.getStatus() == Status.PREP);
+    }
+
     public void testStatus() throws Exception {
         TestContext context = newContext();
         ClientResponse response;


Mime
View raw message