falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-1434 Enhance schedule API to accept key-value properties(Contributed by Pallavi Rao)
Date Fri, 18 Sep 2015 07:41:09 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 5d8b36c16 -> b806b32fd


FALCON-1434 Enhance schedule API to accept key-value properties(Contributed by Pallavi Rao)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/b806b32f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/b806b32f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/b806b32f

Branch: refs/heads/master
Commit: b806b32fd6829b4f6f49e612a6668df55c49bd03
Parents: 5d8b36c
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Fri Sep 18 12:20:12 2015 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Fri Sep 18 12:20:12 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../java/org/apache/falcon/cli/FalconCLI.java   |  9 +++--
 .../falcon/client/AbstractFalconClient.java     |  4 +--
 .../org/apache/falcon/client/FalconClient.java  | 36 ++++++++++++--------
 .../org/apache/falcon/entity/EntityUtil.java    | 23 +++++++++++++
 .../workflow/engine/AbstractWorkflowEngine.java |  3 +-
 .../apache/falcon/entity/EntityUtilTest.java    | 30 ++++++++++++++++
 .../workflow/engine/OozieWorkflowEngine.java    |  3 +-
 .../AbstractSchedulableEntityManager.java       | 19 ++++++-----
 .../proxy/SchedulableEntityManagerProxy.java    | 11 +++---
 .../apache/falcon/unit/FalconUnitClient.java    |  8 ++---
 .../apache/falcon/unit/FalconUnitTestBase.java  | 13 ++++---
 .../org/apache/falcon/unit/TestFalconUnit.java  |  3 +-
 .../resource/SchedulableEntityManager.java      |  5 +--
 .../java/org/apache/falcon/cli/FalconCLIIT.java |  3 +-
 .../falcon/resource/EntityManagerJerseyIT.java  |  4 +--
 .../org/apache/falcon/resource/TestContext.java | 20 ++++++-----
 17 files changed, 139 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 013e0fd..25f02f0 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ Trunk (Unreleased)
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1434 Enhance schedule API to accept key-value properties(Pallavi Rao)   
+
     FALCON-1425 Provide Email based plugin to send Notification once instance completed(Peeyush
Bishnoi via Ajay Yadava)
 
     FALCON-1205 SLAService to keep track of missing SLAs for feeds(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index e684678..c914649 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -91,6 +91,7 @@ public class FalconCLI {
     public static final String LIST_OPT = "list";
     public static final String TOUCH_OPT = "touch";
     public static final String SKIPDRYRUN_OPT = "skipDryRun";
+    public static final String PROPS_OPT = "properties";
 
     public static final String FIELDS_OPT = "fields";
     public static final String FILTER_BY_OPT = "filterBy";
@@ -441,6 +442,8 @@ public class FalconCLI {
             skipDryRun = true;
         }
 
+        String userProps = commandLine.getOptionValue(PROPS_OPT);
+
         EntityType entityTypeEnum = null;
         if (optionsList.contains(LIST_OPT)) {
             if (entityType == null) {
@@ -476,7 +479,7 @@ public class FalconCLI {
         } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
-            result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser).getMessage();
+            result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser,
userProps).getMessage();
         } else if (optionsList.contains(VALIDATE_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
@@ -484,7 +487,7 @@ public class FalconCLI {
         } else if (optionsList.contains(SCHEDULE_OPT)) {
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
             colo = getColo(colo);
-            result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun, doAsUser).getMessage();
+            result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun, doAsUser,
userProps).getMessage();
         } else if (optionsList.contains(SUSPEND_OPT)) {
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
             colo = getColo(colo);
@@ -759,6 +762,7 @@ public class FalconCLI {
         Option path = new Option(PATH_OPT, true, "Path for a feed's instance");
         Option skipDryRun = new Option(SKIPDRYRUN_OPT, false, "skip dry run in workflow engine");
         Option doAs = new Option(DO_AS_OPT, true, "doAs user");
+        Option userProps = new Option(PROPS_OPT, true, "User supplied comma separated key
value properties");
 
         entityOptions.addOption(url);
         entityOptions.addOption(path);
@@ -782,6 +786,7 @@ public class FalconCLI {
         entityOptions.addOption(numInstances);
         entityOptions.addOption(skipDryRun);
         entityOptions.addOption(doAs);
+        entityOptions.addOption(userProps);
 
         return entityOptions;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 1146011..265e08c 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -48,7 +48,7 @@ public abstract class AbstractFalconClient {
      * @return
      * @throws FalconCLIException
      */
-    public abstract APIResult schedule(EntityType entityType, String entityName,
-                                       String colo, Boolean skipDryRun, String doAsuser)
throws FalconCLIException;
+    public abstract APIResult schedule(EntityType entityType, String entityName, String colo,
Boolean skipDryRun,
+                                        String doAsuser, String properties) throws FalconCLIException;
 
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 6075f5c..981559b 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -287,32 +287,32 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     public APIResult schedule(EntityType entityType, String entityName, String colo,
-                              Boolean skipDryRun, String doAsUser)
+                              Boolean skipDryRun, String doAsUser, String properties)
         throws FalconCLIException {
 
         return sendEntityRequest(Entities.SCHEDULE, entityType, entityName,
-                colo, skipDryRun, doAsUser);
+                colo, skipDryRun, doAsUser, properties);
 
     }
 
     public APIResult suspend(EntityType entityType, String entityName, String colo, String
doAsUser)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null, doAsUser);
+        return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null, doAsUser,
null);
 
     }
 
     public APIResult resume(EntityType entityType, String entityName, String colo, String
doAsUser)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null, doAsUser);
+        return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null, doAsUser,
null);
 
     }
 
     public APIResult delete(EntityType entityType, String entityName, String doAsUser)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null, doAsUser);
+        return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null, doAsUser,
null);
 
     }
 
@@ -321,7 +321,7 @@ public class FalconClient extends AbstractFalconClient {
 
         InputStream entityStream = getServletInputStream(filePath);
         return sendEntityRequestWithObject(Entities.VALIDATE, entityType,
-                entityStream, null, skipDryRun, doAsUser);
+                entityStream, null, skipDryRun, doAsUser, null);
     }
 
     public APIResult submit(String entityType, String filePath, String doAsUser)
@@ -329,7 +329,7 @@ public class FalconClient extends AbstractFalconClient {
 
         InputStream entityStream = getServletInputStream(filePath);
         return sendEntityRequestWithObject(Entities.SUBMIT, entityType,
-                entityStream, null, null, doAsUser);
+                entityStream, null, null, doAsUser, null);
     }
 
     public APIResult update(String entityType, String entityName, String filePath,
@@ -353,18 +353,18 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     public APIResult submitAndSchedule(String entityType, String filePath,
-                                       Boolean skipDryRun, String doAsUser)
+                                       Boolean skipDryRun, String doAsUser, String properties)
         throws FalconCLIException {
 
         InputStream entityStream = getServletInputStream(filePath);
         return sendEntityRequestWithObject(Entities.SUBMITandSCHEDULE,
-                entityType, entityStream, null, skipDryRun, doAsUser);
+                entityType, entityStream, null, skipDryRun, doAsUser, properties);
     }
 
     public APIResult getStatus(EntityType entityType, String entityName, String colo, String
doAsUser)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null, doAsUser);
+        return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null, doAsUser,
null);
     }
 
     public Entity getDefinition(String entityType, String entityName, String doAsUser)
@@ -641,7 +641,7 @@ public class FalconClient extends AbstractFalconClient {
 
     private APIResult sendEntityRequest(Entities entities, EntityType entityType,
                                      String entityName, String colo, Boolean skipDryRun,
-                                     String doAsUser) throws FalconCLIException {
+                                     String doAsUser, String properties) throws FalconCLIException
{
 
         WebResource resource = service.path(entities.path)
                 .path(entityType.toString().toLowerCase()).path(entityName);
@@ -655,6 +655,10 @@ public class FalconClient extends AbstractFalconClient {
             resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
         }
 
+        if (StringUtils.isNotEmpty(properties)) {
+            resource = resource.queryParam("properties", properties);
+        }
+
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
@@ -788,8 +792,8 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     private APIResult sendEntityRequestWithObject(Entities entities, String entityType,
-                                               Object requestObject, String colo,
-                                               Boolean skipDryRun, String doAsUser) throws
FalconCLIException {
+                                               Object requestObject, String colo, Boolean
skipDryRun,
+                                               String doAsUser, String properties) throws
FalconCLIException {
         WebResource resource = service.path(entities.path)
                 .path(entityType);
         if (colo != null) {
@@ -803,6 +807,10 @@ public class FalconClient extends AbstractFalconClient {
         if (StringUtils.isNotEmpty(doAsUser)) {
             resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
         }
+
+        if (StringUtils.isNotEmpty(properties)) {
+            resource = resource.queryParam("properties", properties);
+        }
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
@@ -1094,7 +1102,7 @@ public class FalconClient extends AbstractFalconClient {
                 RecipeTool.main(args);
             }
             validate(EntityType.PROCESS.toString(), processFile, skipDryRun, doAsUser);
-            return submitAndSchedule(EntityType.PROCESS.toString(), processFile, skipDryRun,
doAsUser);
+            return submitAndSchedule(EntityType.PROCESS.toString(), processFile, skipDryRun,
doAsUser, null);
         } catch (Exception e) {
             throw new FalconCLIException(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 646afc3..ad41674 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -950,4 +950,27 @@ public final class EntityUtil {
             throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
         }
     }
+
+
+    /**
+     * @param properties - String of format key1:value1, key2:value2
+     * @return
+     */
+    public static Map<String, String> getPropertyMap(String properties) {
+        Map<String, String> props = new HashMap<>();
+        if (StringUtils.isNotEmpty(properties)) {
+            String[] kvPairs = properties.split(",");
+            for (String kvPair : kvPairs) {
+                String[] keyValue = kvPair.trim().split(":", 2);
+                if (keyValue.length == 2 && !keyValue[0].trim().isEmpty() &&
!keyValue[1].trim().isEmpty()) {
+                    props.put(keyValue[0].trim(), keyValue[1].trim());
+                } else {
+                    throw new IllegalArgumentException("Found invalid property " + keyValue[0]
+                            + ". Schedule properties must be comma separated key-value pairs.
"
+                            + " Example: key1:value1,key2:value2");
+                }
+            }
+        }
+        return props;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index ea86c2a..265106b 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -51,7 +51,8 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract boolean isAlive(Cluster cluster) throws FalconException;
 
-    public abstract void schedule(Entity entity, Boolean skipDryRun) throws FalconException;
+    public abstract void schedule(Entity entity, Boolean skipDryRun, Map<String, String>
properties)
+        throws FalconException;
 
     public abstract String suspend(Entity entity) throws FalconException;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
index f6a4679..d022bae 100644
--- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
@@ -43,6 +43,7 @@ import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.Map;
 import java.util.Properties;
 import java.util.TimeZone;
 
@@ -375,4 +376,33 @@ public class EntityUtilTest extends AbstractTestBase {
             {new Path("/projects/falcon/staging/falcon/workflows/process/test-process/"),
true, false},
         };
     }
+
+    @Test
+    public void testStringToProps() {
+        String testPropsString = "key1:value1,key2 : value2 , key3: value3, key4:value4:test";
+        Map<String, String> props = EntityUtil.getPropertyMap(testPropsString);
+        Assert.assertEquals(props.size(), 4);
+        for (int i = 1; i <= 3; i++) {
+            Assert.assertEquals(props.get("key" + i), "value" + i);
+        }
+        Assert.assertEquals(props.get("key4"), "value4:test");
+    }
+
+    @Test (expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "Found invalid property .*",
+            dataProvider = "InvalidProps")
+    public void testInvalidStringToProps(String propString) {
+        String[] invalidProps = {"key1", "key1=value1", "key1:value1,key2=value2, :value"};
+        EntityUtil.getPropertyMap(propString);
+    }
+
+    @DataProvider(name = "InvalidProps")
+    public Object[][] getInvalidProps() {
+        return new Object[][]{
+            {"key1"},
+            {"key1=value1"},
+            {"key1:value1,key2=value2"},
+            {":value"},
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 5f79ca1..96661ad 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -148,7 +148,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public void schedule(Entity entity, Boolean skipDryRun) throws FalconException {
+    public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> suppliedProps)
throws FalconException {
         Map<String, BundleJob> bundleMap = findLatestBundle(entity);
         List<String> schedClusters = new ArrayList<String>();
         for (Map.Entry<String, BundleJob> entry : bundleMap.entrySet()) {
@@ -172,7 +172,6 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                     LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(),
cluster);
                     continue;
                 }
-
                 //Do dryRun of coords before schedule as schedule is asynchronous
                 dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)),
skipDryRun);
                 scheduleEntity(clusterName, properties, entity);

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index 61638f3..3280789 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -39,8 +39,8 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -67,10 +67,11 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type")
String type,
             @Dimension("entityName") @PathParam("entity") String entity,
             @Dimension("colo") @PathParam("colo") String colo,
-            @QueryParam("skipDryRun") Boolean skipDryRun) {
+            @QueryParam("skipDryRun") Boolean skipDryRun,
+            @QueryParam("properties") String properties) {
         checkColo(colo);
         try {
-            scheduleInternal(type, entity, skipDryRun);
+            scheduleInternal(type, entity, skipDryRun,  EntityUtil.getPropertyMap(properties));
             return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") scheduled
successfully");
         } catch (Throwable e) {
             LOG.error("Unable to schedule workflow", e);
@@ -78,8 +79,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         }
     }
 
-    private synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun)
-        throws FalconException, AuthorizationException {
+    private synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun,
+            Map<String, String> properties) throws FalconException, AuthorizationException
{
 
         checkSchedulableEntity(type);
         Entity entityObj = null;
@@ -91,7 +92,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
                         + entityObj.toShortString());
             }
             LOG.info("Memory lock obtained for {} by {}", entityObj.toShortString(), Thread.currentThread().getName());
-            getWorkflowEngine().schedule(entityObj, skipDryRun);
+            getWorkflowEngine().schedule(entityObj, skipDryRun, properties);
         } catch (Exception e) {
             throw new FalconException("Entity schedule failed for " + type + ": " + entity,
e);
         } finally {
@@ -100,7 +101,6 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
                 LOG.info("Memory lock released for {}", entityObj.toShortString());
             }
         }
-
     }
 
     /**
@@ -112,12 +112,13 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
     public APIResult submitAndSchedule(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type")
String type,
             @Dimension("colo") @PathParam("colo") String colo,
-            @QueryParam("skipDryRun") Boolean skipDryRun) {
+            @QueryParam("skipDryRun") Boolean skipDryRun,
+            @QueryParam("properties") String properties) {
         checkColo(colo);
         try {
             checkSchedulableEntity(type);
             Entity entity = submitInternal(request, type);
-            scheduleInternal(type, entity.getName(), skipDryRun);
+            scheduleInternal(type, entity.getName(), skipDryRun, EntityUtil.getPropertyMap(properties));
             return new APIResult(APIResult.Status.SUCCEEDED,
                     entity.getName() + "(" + type + ") scheduled successfully");
         } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 23f1605..61a80c1 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -389,7 +389,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
                               @Dimension("entityType") @PathParam("type") final String type,
                               @Dimension("entityName") @PathParam("entity") final String
entity,
                               @Dimension("colo") @QueryParam("colo") final String coloExpr,
-                              @QueryParam("skipDryRun") final Boolean skipDryRun) {
+                              @QueryParam("skipDryRun") final Boolean skipDryRun,
+                              @QueryParam("properties") final String properties) {
 
         final HttpServletRequest bufferedRequest = getBufferedRequest(request);
         return new EntityProxy(type, entity) {
@@ -400,7 +401,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
colo, skipDryRun);
+                return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
colo, skipDryRun,
+                        properties);
             }
         }.execute();
     }
@@ -414,12 +416,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     public APIResult submitAndSchedule(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type")
String type,
             @Dimension("colo") @QueryParam("colo") String coloExpr,
-            @QueryParam("skipDryRun") Boolean skipDryRun) {
+            @QueryParam("skipDryRun") Boolean skipDryRun,
+            @QueryParam("properties") String properties) {
         BufferedRequest bufferedRequest = new BufferedRequest(request);
         String entity = getEntity(bufferedRequest, type).getName();
         Map<String, APIResult> results = new HashMap<String, APIResult>();
         results.put("submit", submit(bufferedRequest, type, coloExpr));
-        results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr, skipDryRun));
+        results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr, skipDryRun,
properties));
         return consolidateResult(results, APIResult.class);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index d907683..169614b 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -119,8 +119,8 @@ public class FalconUnitClient extends AbstractFalconClient {
      */
     @Override
     public APIResult schedule(EntityType entityType, String entityName, String cluster,
-                              Boolean skipDryRun, String doAsUser) throws FalconCLIException
{
-        return schedule(entityType, entityName, null, 0, cluster, skipDryRun);
+                              Boolean skipDryRun, String doAsUser, String properties) throws
FalconCLIException {
+        return schedule(entityType, entityName, null, 0, cluster, skipDryRun, properties);
     }
 
 
@@ -134,7 +134,7 @@ public class FalconUnitClient extends AbstractFalconClient {
      * @return boolean
      */
     public APIResult schedule(EntityType entityType, String entityName, String startTime,
int numInstances,
-                              String cluster, Boolean skipDryRun) throws FalconCLIException
{
+                              String cluster, Boolean skipDryRun, String properties) throws
FalconCLIException {
         try {
             FalconUnitHelper.checkSchedulableEntity(entityType.toString());
             Entity entity = EntityUtil.getEntity(entityType, entityName);
@@ -147,7 +147,7 @@ public class FalconUnitClient extends AbstractFalconClient {
             if (StringUtils.isNotEmpty(startTime) && entityType == EntityType.PROCESS)
{
                 updateStartAndEndTime((Process) entity, startTime, numInstances, cluster);
             }
-            workflowEngine.schedule(entity, skipDryRun);
+            workflowEngine.schedule(entity, skipDryRun,  EntityUtil.getPropertyMap(properties));
             LOG.info(entityName + " is scheduled successfully");
             return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + "PROCESS" + ")
scheduled successfully");
         } catch (FalconException e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index df73628..995af2b 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -147,24 +147,27 @@ public class FalconUnitTestBase {
     }
 
     public APIResult scheduleProcess(String processName, String startTime, int numInstances,
-                                   String cluster, String localWfPath, Boolean skipDryRun)
throws FalconException,
-            IOException, FalconCLIException {
+                                   String cluster, String localWfPath, Boolean skipDryRun,
+                                   String properties) throws FalconException, IOException,
FalconCLIException {
         Process processEntity = configStore.get(EntityType.PROCESS, processName);
         if (processEntity == null) {
             throw new FalconException("Process not found " + processName);
         }
         String workflowPath = processEntity.getWorkflow().getPath();
         fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath));
-        return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances,
cluster, skipDryRun);
+        return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances,
cluster,
+                skipDryRun, properties);
     }
 
     public APIResult scheduleProcess(String processName, String startTime, int numInstances,
-                                   String cluster, Boolean skipDryRun) throws FalconException,
FalconCLIException {
+                                   String cluster, Boolean skipDryRun,
+                                   String properties) throws FalconException, FalconCLIException
{
         Process processEntity = configStore.get(EntityType.PROCESS, processName);
         if (processEntity == null) {
             throw new FalconException("Process not found " + processName);
         }
-        return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances,
cluster, skipDryRun);
+        return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances,
cluster,
+                skipDryRun, properties);
     }
 
     private Map<String, String> updateColoAndCluster(String colo, String cluster, Map<String,
String> props) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 498f50e..fa9c664 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -44,7 +44,8 @@ public class TestFalconUnit extends FalconUnitTestBase {
         createData("in", "local", scheduleTime, "input.txt");
         result = submitProcess(getAbsolutePath("/process.xml"), "/app/oozie-mr");
         assertStatus(result);
-        result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"),
true);
+        result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"),
+                true, "");
         assertStatus(result);
         waitForStatus(EntityType.PROCESS, "process", scheduleTime);
         InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(EntityType.PROCESS,

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index 3bafb25..1c0fc74 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -129,8 +129,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager
{
                               @Dimension("entityType") @PathParam("type") String type,
                               @Dimension("entityName") @PathParam("entity") String entity,
                               @Dimension("colo") @QueryParam("colo") String colo,
-                              @QueryParam("skipDryRun") Boolean skipDryRun) {
-        return super.schedule(request, type, entity, colo, skipDryRun);
+                              @QueryParam("skipDryRun") Boolean skipDryRun,
+                              @QueryParam("properties") String properties) {
+        return super.schedule(request, type, entity, colo, skipDryRun, properties);
     }
 
     @POST

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index b859256..5ed0a4e 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -184,7 +184,8 @@ public class FalconCLIIT {
 
         Assert.assertEquals(executeWithURL("entity -schedule -type feed -name " + overlay.get("outputFeedName")),
0);
 
-        Assert.assertEquals(executeWithURL("entity -schedule -type process -name " + overlay.get("processName")),
0);
+        Assert.assertEquals(executeWithURL("entity -schedule -type process -name " + overlay.get("processName")
+                                            + " -properties key:value"), 0);
 
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index 220e5a7..50d5b94 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -642,9 +642,9 @@ public class EntityManagerJerseyIT {
         File tmpFile = TestContext.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
         if (withDoAs) {
-            context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, null, "testUser");
+            context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, null, "testUser",
null);
         } else {
-            context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
+            context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, null, "", "key1:value1");
         }
         OozieTestUtils.waitForBundleStart(context, Status.RUNNING);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/b806b32f/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index f031137..c9e9d4f 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -231,20 +231,20 @@ public class TestContext {
     }
 
     public void scheduleProcess(String processTemplate, Map<String, String> overlay)
throws Exception {
-        scheduleProcess(processTemplate, overlay, true, null, "");
+        scheduleProcess(processTemplate, overlay, true, null, "", null);
     }
 
     public void scheduleProcess(String processTemplate, Map<String, String> overlay,
-                                Boolean skipDryRun, final String doAsUSer) throws Exception
{
-        scheduleProcess(processTemplate, overlay, true, skipDryRun, doAsUSer);
+                                Boolean skipDryRun, final String doAsUSer, String properties)
throws Exception {
+        scheduleProcess(processTemplate, overlay, true, skipDryRun, doAsUSer, properties);
     }
 
     public void scheduleProcess(String processTemplate, Map<String, String> overlay,
boolean succeed) throws Exception{
-        scheduleProcess(processTemplate, overlay, succeed, null, "");
+        scheduleProcess(processTemplate, overlay, succeed, null, "", null);
     }
 
     public void scheduleProcess(String processTemplate, Map<String, String> overlay,
boolean succeed,
-                                Boolean skipDryRun, final String doAsUser) throws Exception
{
+                                Boolean skipDryRun, final String doAsUser, String properties)
throws Exception {
         ClientResponse response = submitToFalcon(CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
         assertSuccessful(response);
 
@@ -254,7 +254,7 @@ public class TestContext {
         response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED);
         assertSuccessful(response);
 
-        response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS, skipDryRun,
doAsUser);
+        response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS, skipDryRun,
doAsUser, properties);
         if (succeed) {
             assertSuccessful(response);
         } else {
@@ -289,12 +289,12 @@ public class TestContext {
 
     public ClientResponse submitAndSchedule(String template, Map<String, String> overlay,
EntityType entityType)
         throws Exception {
-        return submitAndSchedule(template, overlay, entityType, null, "");
+        return submitAndSchedule(template, overlay, entityType, null, "", null);
     }
 
     public ClientResponse submitAndSchedule(String template, Map<String, String> overlay,
                                             EntityType entityType, Boolean skipDryRun,
-                                            final String doAsUser) throws Exception {
+                                            final String doAsUser, String properties) throws
Exception {
         String tmpFile = overlayParametersOverTemplate(template, overlay);
         ServletInputStream rawlogStream = getServletInputStream(tmpFile);
 
@@ -308,6 +308,10 @@ public class TestContext {
             resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
         }
 
+        if (StringUtils.isNotEmpty(properties)) {
+            resource = resource.queryParam("properties", properties);
+        }
+
         return resource.header("Cookie", getAuthenticationToken())
                 .accept(MediaType.TEXT_XML)
                 .type(MediaType.TEXT_XML)


Mime
View raw message