falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sowmya...@apache.org
Subject [3/3] falcon git commit: FALCON-1027 Falcon proxy user support. Contributed by Sowmya Ramesh.
Date Tue, 15 Sep 2015 01:46:40 GMT
FALCON-1027 Falcon proxy user support. Contributed by Sowmya Ramesh.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d8fbec9f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d8fbec9f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d8fbec9f

Branch: refs/heads/master
Commit: d8fbec9f97ba2bd2207827c7b83a8b861c25f9f0
Parents: cbd7c80
Author: Sowmya Ramesh <sramesh@hortonworks.com>
Authored: Mon Sep 14 18:46:21 2015 -0700
Committer: Sowmya Ramesh <sramesh@hortonworks.com>
Committed: Mon Sep 14 18:46:21 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/falcon/cli/FalconCLI.java   |  83 ++++--
 .../apache/falcon/cli/FalconMetadataCLI.java    |  18 +-
 .../falcon/client/AbstractFalconClient.java     |   4 +-
 .../org/apache/falcon/client/FalconClient.java  | 298 ++++++++++++-------
 .../org/apache/falcon/security/CurrentUser.java |  40 ++-
 .../apache/falcon/security/SecurityUtil.java    |  14 +-
 .../apache/falcon/service/GroupsService.java    |  67 +++++
 .../apache/falcon/service/ProxyUserService.java | 203 +++++++++++++
 common/src/main/resources/runtime.properties    |  18 +-
 common/src/main/resources/startup.properties    |   4 +-
 .../apache/falcon/security/CurrentUserTest.java |  59 ++++
 .../falcon/security/SecurityUtilTest.java       |  53 +++-
 .../falcon/service/GroupsServiceTest.java       |  56 ++++
 .../falcon/service/ProxyUserServiceTest.java    | 167 +++++++++++
 docs/src/site/twiki/FalconCLI.twiki             |  14 +
 docs/src/site/twiki/FalconDocumentation.twiki   |  26 ++
 .../site/twiki/restapi/AdjacentVertices.twiki   |  21 ++
 docs/src/site/twiki/restapi/AdminStack.twiki    |   5 +-
 docs/src/site/twiki/restapi/AdminVersion.twiki  |   5 +-
 docs/src/site/twiki/restapi/AllEdges.twiki      |   4 +-
 docs/src/site/twiki/restapi/AllVertices.twiki   |   4 +-
 docs/src/site/twiki/restapi/Edge.twiki          |   3 +-
 .../site/twiki/restapi/EntityDefinition.twiki   |   3 +-
 docs/src/site/twiki/restapi/EntityDelete.twiki  |   3 +-
 .../site/twiki/restapi/EntityDependencies.twiki |   3 +-
 docs/src/site/twiki/restapi/EntityLineage.twiki |   3 +-
 docs/src/site/twiki/restapi/EntityList.twiki    |   3 +-
 docs/src/site/twiki/restapi/EntityResume.twiki  |   3 +-
 .../src/site/twiki/restapi/EntitySchedule.twiki |   5 +-
 docs/src/site/twiki/restapi/EntityStatus.twiki  |   3 +-
 docs/src/site/twiki/restapi/EntitySubmit.twiki  |   5 +-
 .../twiki/restapi/EntitySubmitAndSchedule.twiki |   5 +-
 docs/src/site/twiki/restapi/EntitySummary.twiki |   3 +-
 docs/src/site/twiki/restapi/EntitySuspend.twiki |   3 +-
 docs/src/site/twiki/restapi/EntityTouch.twiki   |   7 +-
 docs/src/site/twiki/restapi/EntityUpdate.twiki  |   5 +-
 .../src/site/twiki/restapi/EntityValidate.twiki |   5 +-
 .../twiki/restapi/FeedInstanceListing.twiki     |   3 +-
 docs/src/site/twiki/restapi/FeedLookup.twiki    |   5 +-
 docs/src/site/twiki/restapi/Graph.twiki         |   4 +-
 docs/src/site/twiki/restapi/InstanceKill.twiki  |   3 +-
 docs/src/site/twiki/restapi/InstanceList.twiki  |   3 +-
 docs/src/site/twiki/restapi/InstanceLogs.twiki  |   3 +-
 .../src/site/twiki/restapi/InstanceParams.twiki |   5 +-
 docs/src/site/twiki/restapi/InstanceRerun.twiki |   3 +-
 .../src/site/twiki/restapi/InstanceResume.twiki |  44 +--
 .../site/twiki/restapi/InstanceRunning.twiki    |   5 +-
 .../src/site/twiki/restapi/InstanceStatus.twiki |   4 +-
 .../site/twiki/restapi/InstanceSummary.twiki    |   3 +-
 .../site/twiki/restapi/InstanceSuspend.twiki    |   5 +-
 docs/src/site/twiki/restapi/MetadataList.twiki  |   3 +-
 .../site/twiki/restapi/MetadataRelations.twiki  |   3 +-
 docs/src/site/twiki/restapi/Triage.twiki        |   3 +-
 docs/src/site/twiki/restapi/Vertex.twiki        |   3 +-
 .../site/twiki/restapi/VertexProperties.twiki   |   3 +-
 docs/src/site/twiki/restapi/Vertices.twiki      |   5 +-
 .../falcon/resource/AbstractEntityManager.java  |  16 +-
 .../AbstractSchedulableEntityManager.java       |   4 +-
 .../falcon/resource/channel/HTTPChannel.java    |   5 +
 .../proxy/SchedulableEntityManagerProxy.java    |  10 +-
 .../security/FalconAuthenticationFilter.java    |   8 +-
 .../security/FalconAuthorizationFilter.java     |   9 +-
 .../apache/falcon/security/HostnameFilter.java  | 105 +++++++
 prism/src/main/webapp/WEB-INF/web.xml           |  10 +
 .../falcon/resource/EntityManagerTest.java      |  29 +-
 .../FalconAuthenticationFilterTest.java         |  83 +++++-
 .../falcon/security/HostnameFilterTest.java     |  93 ++++++
 src/conf/runtime.properties                     |  16 +
 src/conf/startup.properties                     |   4 +-
 .../apache/falcon/unit/FalconUnitClient.java    |   6 +-
 .../apache/falcon/unit/FalconUnitTestBase.java  |   4 +-
 webapp/pom.xml                                  |  22 ++
 webapp/src/conf/oozie/conf/oozie-site.xml       |   4 +-
 .../resource/SchedulableEntityManager.java      |  10 +-
 .../src/main/webapp/WEB-INF/distributed/web.xml |  10 +
 webapp/src/main/webapp/WEB-INF/embedded/web.xml |  10 +
 webapp/src/main/webapp/WEB-INF/web.xml          |  10 +
 .../java/org/apache/falcon/cli/FalconCLIIT.java | 162 ++++++++--
 .../falcon/resource/EntityManagerJerseyIT.java  |  26 +-
 .../resource/MetadataResourceJerseyIT.java      |   1 +
 .../org/apache/falcon/resource/TestContext.java |  54 +++-
 webapp/src/test/resources/runtime.properties    |  50 ++++
 83 files changed, 1772 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 688c3c4..be4324c 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
     FALCON-1357 Update CHANGES.txt to change 0.7 branch to release.(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index d4da302..e684678 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -142,6 +142,9 @@ public class FalconCLI {
         clientProperties = getClientProperties();
     }
 
+    // doAs option
+    public static final String DO_AS_OPT = "doAs";
+
     /**
      * Entry point for the Falcon CLI when invoked from the command line. Upon
      * completion this method exits the JVM with '0' (success) or '-1'
@@ -255,6 +258,7 @@ public class FalconCLI {
         String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
         String orderBy = commandLine.getOptionValue(ORDER_BY_OPT);
         String sortOrder = commandLine.getOptionValue(SORT_ORDER_OPT);
+        String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
         Integer offset = parseIntegerInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
         Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT), null, "numResults");
 
@@ -281,7 +285,7 @@ public class FalconCLI {
             result =
                 ResponseHelper.getString(client.getRunningInstances(type,
                         entity, colo, lifeCycles, filterBy, orderBy, sortOrder,
-                        offset, numResults));
+                        offset, numResults, doAsUser));
         } else if (optionsList.contains(STATUS_OPT) || optionsList.contains(LIST_OPT)) {
             validateOrderBy(orderBy, instanceAction);
             validateFilterBy(filterBy, instanceAction);
@@ -289,35 +293,35 @@ public class FalconCLI {
                 ResponseHelper.getString(client
                         .getStatusOfInstances(type, entity, start, end, colo,
                                 lifeCycles,
-                                filterBy, orderBy, sortOrder, offset, numResults));
+                                filterBy, orderBy, sortOrder, offset, numResults, doAsUser));
         } else if (optionsList.contains(SUMMARY_OPT)) {
             validateOrderBy(orderBy, "summary");
             validateFilterBy(filterBy, "summary");
             result =
-                ResponseHelper.getString(client
-                        .getSummaryOfInstances(type, entity, start, end, colo,
-                                lifeCycles, filterBy, orderBy, sortOrder));
+                    ResponseHelper.getString(client
+                            .getSummaryOfInstances(type, entity, start, end, colo,
+                                    lifeCycles, filterBy, orderBy, sortOrder, doAsUser));
         } else if (optionsList.contains(KILL_OPT)) {
             validateNotEmpty(start, START_OPT);
             validateNotEmpty(end, END_OPT);
             result =
                 ResponseHelper.getString(client
                         .killInstances(type, entity, start, end, colo, clusters,
-                                sourceClusters, lifeCycles));
+                                sourceClusters, lifeCycles, doAsUser));
         } else if (optionsList.contains(SUSPEND_OPT)) {
             validateNotEmpty(start, START_OPT);
             validateNotEmpty(end, END_OPT);
             result =
                 ResponseHelper.getString(client
                         .suspendInstances(type, entity, start, end, colo, clusters,
-                                sourceClusters, lifeCycles));
+                                sourceClusters, lifeCycles, doAsUser));
         } else if (optionsList.contains(RESUME_OPT)) {
             validateNotEmpty(start, START_OPT);
             validateNotEmpty(end, END_OPT);
             result =
                 ResponseHelper.getString(client
                         .resumeInstances(type, entity, start, end, colo, clusters,
-                                sourceClusters, lifeCycles));
+                                sourceClusters, lifeCycles, doAsUser));
         } else if (optionsList.contains(RERUN_OPT)) {
             validateNotEmpty(start, START_OPT);
             validateNotEmpty(end, END_OPT);
@@ -329,7 +333,7 @@ public class FalconCLI {
                 ResponseHelper.getString(client
                         .rerunInstances(type, entity, start, end, filePath, colo,
                                 clusters, sourceClusters,
-                                lifeCycles, isForced));
+                                lifeCycles, isForced, doAsUser));
         } else if (optionsList.contains(LOG_OPT)) {
             validateOrderBy(orderBy, instanceAction);
             validateFilterBy(filterBy, instanceAction);
@@ -337,18 +341,18 @@ public class FalconCLI {
                 ResponseHelper.getString(client
                                 .getLogsOfInstances(type, entity, start, end, colo, runId,
                                         lifeCycles,
-                                        filterBy, orderBy, sortOrder, offset, numResults),
+                                        filterBy, orderBy, sortOrder, offset, numResults, doAsUser),
                         runId);
         } else if (optionsList.contains(PARARMS_OPT)) {
             // start time is the nominal time of instance
             result =
                 ResponseHelper
                     .getString(client.getParamsOfInstance(
-                            type, entity, start, colo, lifeCycles));
+                            type, entity, start, colo, lifeCycles, doAsUser));
         } else if (optionsList.contains(LISTING_OPT)) {
             result =
                 ResponseHelper.getString(client
-                        .getFeedListing(type, entity, start, end, colo));
+                        .getFeedListing(type, entity, start, end, colo, doAsUser));
         } else {
             throw new FalconCLIException("Invalid command");
         }
@@ -429,6 +433,8 @@ public class FalconCLI {
         Integer offset = parseIntegerInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
         Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT),
                 null, "numResults");
+        String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
+
         Integer numInstances = parseIntegerInput(commandLine.getOptionValue(NUM_INSTANCES_OPT), 7, "numInstances");
         Boolean skipDryRun = null;
         if (optionsList.contains(SKIPDRYRUN_OPT)) {
@@ -456,61 +462,61 @@ public class FalconCLI {
         if (optionsList.contains(SUBMIT_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
-            result = client.submit(entityType, filePath).getMessage();
+            result = client.submit(entityType, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(LOOKUP_OPT)) {
             validateNotEmpty(feedInstancePath, PATH_OPT);
-            FeedLookupResult resp = client.reverseLookUp(entityType, feedInstancePath);
+            FeedLookupResult resp = client.reverseLookUp(entityType, feedInstancePath, doAsUser);
             result = ResponseHelper.getString(resp);
 
         } else if (optionsList.contains(UPDATE_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
-            result = client.update(entityType, entityName, filePath, skipDryRun).getMessage();
+            result = client.update(entityType, entityName, filePath, skipDryRun, doAsUser).getMessage();
         } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
-            result = client.submitAndSchedule(entityType, filePath, skipDryRun).getMessage();
+            result = client.submitAndSchedule(entityType, filePath, skipDryRun, doAsUser).getMessage();
         } else if (optionsList.contains(VALIDATE_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
-            result = client.validate(entityType, filePath, skipDryRun).getMessage();
+            result = client.validate(entityType, filePath, skipDryRun, doAsUser).getMessage();
         } else if (optionsList.contains(SCHEDULE_OPT)) {
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
             colo = getColo(colo);
-            result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun).getMessage();
+            result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun, doAsUser).getMessage();
         } else if (optionsList.contains(SUSPEND_OPT)) {
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
             colo = getColo(colo);
-            result = client.suspend(entityTypeEnum, entityName, colo).getMessage();
+            result = client.suspend(entityTypeEnum, entityName, colo, doAsUser).getMessage();
         } else if (optionsList.contains(RESUME_OPT)) {
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
             colo = getColo(colo);
-            result = client.resume(entityTypeEnum, entityName, colo).getMessage();
+            result = client.resume(entityTypeEnum, entityName, colo, doAsUser).getMessage();
         } else if (optionsList.contains(DELETE_OPT)) {
             validateColo(optionsList);
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
-            result = client.delete(entityTypeEnum, entityName).getMessage();
+            result = client.delete(entityTypeEnum, entityName, doAsUser).getMessage();
         } else if (optionsList.contains(STATUS_OPT)) {
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
             colo = getColo(colo);
             result =
-                    client.getStatus(entityTypeEnum, entityName, colo).getMessage();
+                    client.getStatus(entityTypeEnum, entityName, colo, doAsUser).getMessage();
         } else if (optionsList.contains(DEFINITION_OPT)) {
             validateColo(optionsList);
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
-            result = client.getDefinition(entityType, entityName).toString();
+            result = client.getDefinition(entityType, entityName, doAsUser).toString();
         } else if (optionsList.contains(DEPENDENCY_OPT)) {
             validateColo(optionsList);
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
-            result = client.getDependency(entityType, entityName).toString();
+            result = client.getDependency(entityType, entityName, doAsUser).toString();
         } else if (optionsList.contains(LIST_OPT)) {
             validateColo(optionsList);
             validateEntityFields(fields);
             validateOrderBy(orderBy, entityAction);
             validateFilterBy(filterBy, entityAction);
             EntityList entityList = client.getEntityList(entityType, fields, nameSubsequence, tagKeywords,
-                    filterBy, filterTags, orderBy, sortOrder, offset, numResults);
+                    filterBy, filterTags, orderBy, sortOrder, offset, numResults, doAsUser);
             result = entityList != null ? entityList.toString() : "No entity of type (" + entityType + ") found.";
         }  else if (optionsList.contains(SUMMARY_OPT)) {
             validateEntityTypeForSummary(entityType);
@@ -523,11 +529,12 @@ public class FalconCLI {
                             .getEntitySummary(
                                     entityType, cluster, start, end, fields, filterBy,
                                     filterTags,
-                                    orderBy, sortOrder, offset, numResults, numInstances));
+                                    orderBy, sortOrder, offset, numResults,
+                                    numInstances, doAsUser));
         } else if (optionsList.contains(TOUCH_OPT)) {
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
             colo = getColo(colo);
-            result = client.touch(entityType, entityName, colo, skipDryRun).getMessage();
+            result = client.touch(entityType, entityName, colo, skipDryRun, doAsUser).getMessage();
         } else if (optionsList.contains(HELP_CMD)) {
             OUT.get().println("Falcon Help");
         } else {
@@ -658,6 +665,8 @@ public class FalconCLI {
                 "show Falcon server build version");
         Option stack = new Option(STACK_OPTION, false,
                 "show the thread stack dump");
+        Option doAs = new Option(DO_AS_OPT, true,
+                "doAs user");
         Option help = new Option("help", false, "show Falcon help");
         group.addOption(status);
         group.addOption(version);
@@ -665,6 +674,7 @@ public class FalconCLI {
         group.addOption(help);
 
         adminOptions.addOptionGroup(group);
+        adminOptions.addOption(doAs);
         return adminOptions;
     }
 
@@ -748,6 +758,7 @@ public class FalconCLI {
                 "Number of instances to return per entity summary request");
         Option path = new Option(PATH_OPT, true, "Path for a feed's instance");
         Option skipDryRun = new Option(SKIPDRYRUN_OPT, false, "skip dry run in workflow engine");
+        Option doAs = new Option(DO_AS_OPT, true, "doAs user");
 
         entityOptions.addOption(url);
         entityOptions.addOption(path);
@@ -770,6 +781,7 @@ public class FalconCLI {
         entityOptions.addOption(numResults);
         entityOptions.addOption(numInstances);
         entityOptions.addOption(skipDryRun);
+        entityOptions.addOption(doAs);
 
         return entityOptions;
     }
@@ -893,6 +905,7 @@ public class FalconCLI {
                 "Number of results to return per request");
         Option forceRerun = new Option(FORCE_RERUN_FLAG, false,
                 "Flag to forcefully rerun entire workflow of an instance");
+        Option doAs = new Option(DO_AS_OPT, true, "doAs user");
 
         Option instanceTime = new Option(INSTANCE_TIME_OPT, true, "Time for an instance");
 
@@ -914,6 +927,7 @@ public class FalconCLI {
         instanceOptions.addOption(sortOrder);
         instanceOptions.addOption(numResults);
         instanceOptions.addOption(forceRerun);
+        instanceOptions.addOption(doAs);
         instanceOptions.addOption(instanceTime);
 
         return instanceOptions;
@@ -936,6 +950,9 @@ public class FalconCLI {
         Option skipDryRunOperation = new Option(SKIPDRYRUN_OPT, false, "skip dryrun operation");
         recipeOptions.addOption(skipDryRunOperation);
 
+        Option doAs = new Option(DO_AS_OPT, true, "doAs user");
+        recipeOptions.addOption(doAs);
+
         return recipeOptions;
     }
 
@@ -964,14 +981,16 @@ public class FalconCLI {
             optionsList.add(option.getOpt());
         }
 
+        String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
+
         if (optionsList.contains(STACK_OPTION)) {
-            result = client.getThreadDump();
+            result = client.getThreadDump(doAsUser);
             OUT.get().println(result);
         }
         int exitValue = 0;
         if (optionsList.contains(STATUS_OPTION)) {
             try {
-                int status = client.getStatus();
+                int status = client.getStatus(doAsUser);
                 if (status != 200) {
                     ERR.get().println("Falcon server is not fully operational (on " + falconUrl + "). "
                             + "Please check log files.");
@@ -984,7 +1003,7 @@ public class FalconCLI {
                 exitValue = -1;
             }
         } else if (optionsList.contains(VERSION_OPTION)) {
-            result = client.getVersion();
+            result = client.getVersion(doAsUser);
             OUT.get().println("Falcon server build version: " + result);
         } else if (optionsList.contains(HELP_CMD)) {
             OUT.get().println("Falcon Help");
@@ -1033,6 +1052,7 @@ public class FalconCLI {
         String recipeName = commandLine.getOptionValue(RECIPE_NAME);
         String recipeToolClass = commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME);
         String recipeOperation = commandLine.getOptionValue(RECIPE_OPERATION);
+        String doAsUser = commandLine.getOptionValue(DO_AS_OPT);
 
         validateNotEmpty(recipeName, RECIPE_NAME);
         validateNotEmpty(recipeOperation, RECIPE_OPERATION);
@@ -1042,7 +1062,8 @@ public class FalconCLI {
             skipDryRun = true;
         }
 
-        String result = client.submitRecipe(recipeName, recipeToolClass, recipeOperation, skipDryRun).toString();
+        String result = client.submitRecipe(recipeName, recipeToolClass,
+                recipeOperation, skipDryRun, doAsUser).toString();
         OUT.get().println(result);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
index 2f57c7d..dbc553c 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
@@ -81,29 +81,30 @@ public class FalconMetadataCLI {
         String value = commandLine.getOptionValue(VALUE_OPT);
         String direction = commandLine.getOptionValue(DIRECTION_OPT);
         String pipeline = commandLine.getOptionValue(PIPELINE_OPT);
+        String doAsUser = commandLine.getOptionValue(FalconCLI.DO_AS_OPT);
 
         if (optionsList.contains(LINEAGE_OPT)) {
             validatePipelineName(pipeline);
-            result = client.getEntityLineageGraph(pipeline).getDotNotation();
+            result = client.getEntityLineageGraph(pipeline, doAsUser).getDotNotation();
         } else if (optionsList.contains(LIST_OPT)) {
             validateDimensionType(dimensionType.toUpperCase());
-            result = client.getDimensionList(dimensionType, cluster);
+            result = client.getDimensionList(dimensionType, cluster, doAsUser);
         } else if (optionsList.contains(RELATIONS_OPT)) {
             validateDimensionType(dimensionType.toUpperCase());
             validateDimensionName(dimensionName, RELATIONS_OPT);
-            result = client.getDimensionRelations(dimensionType, dimensionName);
+            result = client.getDimensionRelations(dimensionType, dimensionName, doAsUser);
         } else if (optionsList.contains(VERTEX_CMD)) {
             validateId(id);
-            result = client.getVertex(id);
+            result = client.getVertex(id, doAsUser);
         } else if (optionsList.contains(VERTICES_CMD)) {
             validateVerticesCommand(key, value);
-            result = client.getVertices(key, value);
+            result = client.getVertices(key, value, doAsUser);
         } else if (optionsList.contains(VERTEX_EDGES_CMD)) {
             validateVertexEdgesCommand(id, direction);
-            result = client.getVertexEdges(id, direction);
+            result = client.getVertexEdges(id, direction, doAsUser);
         } else if (optionsList.contains(EDGE_CMD)) {
             validateId(id);
-            result = client.getEdge(id);
+            result = client.getEdge(id, doAsUser);
         } else {
             throw new FalconCLIException("Invalid metadata command");
         }
@@ -211,6 +212,9 @@ public class FalconMetadataCLI {
         metadataOptions.addOption(value);
         metadataOptions.addOption(direction);
 
+        Option doAs = new Option(FalconCLI.DO_AS_OPT, true, "doAs user");
+        metadataOptions.addOption(doAs);
+
         return metadataOptions;
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 282b41b..1146011 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -37,7 +37,7 @@ public abstract class AbstractFalconClient {
      * @return
      * @throws FalconCLIException
      */
-    public abstract APIResult submit(String entityType, String filePath) throws FalconCLIException,
+    public abstract APIResult submit(String entityType, String filePath, String doAsUser) throws FalconCLIException,
             IOException;
 
     /**
@@ -49,6 +49,6 @@ public abstract class AbstractFalconClient {
      * @throws FalconCLIException
      */
     public abstract APIResult schedule(EntityType entityType, String entityName,
-                                       String colo, Boolean skipDryRun) throws FalconCLIException;
+                                       String colo, Boolean skipDryRun, String doAsuser) throws FalconCLIException;
 
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 44436d2..6075f5c 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.net.util.TrustManagerUtils;
 import org.apache.falcon.LifeCycle;
+import org.apache.falcon.cli.FalconCLI;
 import org.apache.falcon.cli.FalconMetadataCLI;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
@@ -285,52 +286,54 @@ public class FalconClient extends AbstractFalconClient {
         return str;
     }
 
-    public APIResult schedule(EntityType entityType, String entityName, String colo, Boolean skipDryRun)
+    public APIResult schedule(EntityType entityType, String entityName, String colo,
+                              Boolean skipDryRun, String doAsUser)
         throws FalconCLIException {
 
         return sendEntityRequest(Entities.SCHEDULE, entityType, entityName,
-                colo, skipDryRun);
+                colo, skipDryRun, doAsUser);
 
     }
 
-    public APIResult suspend(EntityType entityType, String entityName, String colo)
+    public APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null);
+        return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null, doAsUser);
 
     }
 
-    public APIResult resume(EntityType entityType, String entityName, String colo)
+    public APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null);
+        return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null, doAsUser);
 
     }
 
-    public APIResult delete(EntityType entityType, String entityName)
+    public APIResult delete(EntityType entityType, String entityName, String doAsUser)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null);
+        return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null, doAsUser);
 
     }
 
-    public APIResult validate(String entityType, String filePath, Boolean skipDryRun)
+    public APIResult validate(String entityType, String filePath, Boolean skipDryRun, String doAsUser)
         throws FalconCLIException {
 
         InputStream entityStream = getServletInputStream(filePath);
         return sendEntityRequestWithObject(Entities.VALIDATE, entityType,
-                entityStream, null, skipDryRun);
+                entityStream, null, skipDryRun, doAsUser);
     }
 
-    public APIResult submit(String entityType, String filePath)
+    public APIResult submit(String entityType, String filePath, String doAsUser)
         throws FalconCLIException {
 
         InputStream entityStream = getServletInputStream(filePath);
         return sendEntityRequestWithObject(Entities.SUBMIT, entityType,
-                entityStream, null, null);
+                entityStream, null, null, doAsUser);
     }
 
-    public APIResult update(String entityType, String entityName, String filePath, Boolean skipDryRun)
+    public APIResult update(String entityType, String entityName, String filePath,
+                            Boolean skipDryRun, String doAsUser)
         throws FalconCLIException {
         InputStream entityStream = getServletInputStream(filePath);
         Entities operation = Entities.UPDATE;
@@ -338,6 +341,9 @@ public class FalconClient extends AbstractFalconClient {
         if (null != skipDryRun) {
             resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
         }
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(operation.mimeType).type(MediaType.TEXT_XML)
@@ -346,30 +352,31 @@ public class FalconClient extends AbstractFalconClient {
         return parseAPIResult(clientResponse);
     }
 
-    public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun)
+    public APIResult submitAndSchedule(String entityType, String filePath,
+                                       Boolean skipDryRun, String doAsUser)
         throws FalconCLIException {
 
         InputStream entityStream = getServletInputStream(filePath);
         return sendEntityRequestWithObject(Entities.SUBMITandSCHEDULE,
-                entityType, entityStream, null, skipDryRun);
+                entityType, entityStream, null, skipDryRun, doAsUser);
     }
 
-    public APIResult getStatus(EntityType entityType, String entityName, String colo)
+    public APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null);
+        return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null, doAsUser);
     }
 
-    public Entity getDefinition(String entityType, String entityName)
+    public Entity getDefinition(String entityType, String entityName, String doAsUser)
         throws FalconCLIException {
 
         return sendDefinitionRequest(Entities.DEFINITION, entityType,
-                entityName);
+                entityName, doAsUser);
     }
 
-    public EntityList getDependency(String entityType, String entityName)
+    public EntityList getDependency(String entityType, String entityName, String doAsUser)
         throws FalconCLIException {
-        return sendDependencyRequest(Entities.DEPENDENCY, entityType, entityName);
+        return sendDependencyRequest(Entities.DEPENDENCY, entityType, entityName, doAsUser);
     }
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@@ -389,22 +396,22 @@ public class FalconClient extends AbstractFalconClient {
 
     public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords,
                                     String filterBy, String filterTags, String orderBy, String sortOrder,
-                                    Integer offset, Integer numResults) throws FalconCLIException {
+                                    Integer offset, Integer numResults, String doAsUser) throws FalconCLIException {
         return sendListRequest(Entities.LIST, entityType, fields, nameSubsequence, tagKeywords, filterBy,
-                filterTags, orderBy, sortOrder, offset, numResults);
+                filterTags, orderBy, sortOrder, offset, numResults, doAsUser);
     }
 
     public EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end,
                                    String fields, String filterBy, String filterTags,
                                    String orderBy, String sortOrder,
-                                   Integer offset, Integer numResults, Integer numInstances)
+                                   Integer offset, Integer numResults, Integer numInstances, String doAsUser)
         throws FalconCLIException {
         return sendEntitySummaryRequest(Entities.SUMMARY, entityType, cluster, start, end, fields, filterBy, filterTags,
-                orderBy, sortOrder, offset, numResults, numInstances);
+                orderBy, sortOrder, offset, numResults, numInstances, doAsUser);
     }
 
-    public APIResult touch(String entityType, String entityName,
-                           String colo, Boolean skipDryRun) throws FalconCLIException {
+    public APIResult touch(String entityType, String entityName, String colo,
+                           Boolean skipDryRun, String doAsUser) throws FalconCLIException {
         Entities operation = Entities.TOUCH;
         WebResource resource = service.path(operation.path).path(entityType).path(entityName);
         if (colo != null) {
@@ -413,6 +420,9 @@ public class FalconClient extends AbstractFalconClient {
         if (null != skipDryRun) {
             resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
         }
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(operation.mimeType).type(MediaType.TEXT_XML)
@@ -423,10 +433,10 @@ public class FalconClient extends AbstractFalconClient {
 
     public InstancesResult getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles,
                                       String filterBy, String orderBy, String sortOrder,
-                                      Integer offset, Integer numResults) throws FalconCLIException {
+                                      Integer offset, Integer numResults, String doAsUser) throws FalconCLIException {
 
         return sendInstanceRequest(Instances.RUNNING, type, entity, null, null,
-                null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults)
+                null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, doAsUser)
                 .getEntity(InstancesResult.class);
     }
 
@@ -434,62 +444,66 @@ public class FalconClient extends AbstractFalconClient {
                                        String start, String end,
                                        String colo, List<LifeCycle> lifeCycles, String filterBy,
                                        String orderBy, String sortOrder,
-                                       Integer offset, Integer numResults) throws FalconCLIException {
+                                       Integer offset, Integer numResults, String doAsUser) throws FalconCLIException {
 
         return sendInstanceRequest(Instances.STATUS, type, entity, start, end,
-                null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults)
+                null, null, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, doAsUser)
                 .getEntity(InstancesResult.class);
     }
 
     public InstancesSummaryResult getSummaryOfInstances(String type, String entity,
                                         String start, String end,
                                         String colo, List<LifeCycle> lifeCycles,
-                                        String filterBy, String orderBy, String sortOrder) throws FalconCLIException {
+                                        String filterBy, String orderBy, String sortOrder,
+                                        String doAsUser) throws FalconCLIException {
 
         return sendInstanceRequest(Instances.SUMMARY, type, entity, start, end, null,
-            null, colo, lifeCycles, filterBy, orderBy, sortOrder, 0, null)
+                null, colo, lifeCycles, filterBy, orderBy, sortOrder, 0, null, doAsUser)
                 .getEntity(InstancesSummaryResult.class);
     }
 
     public FeedInstanceResult getFeedListing(String type, String entity, String start,
-                                     String end, String colo)
+                                     String end, String colo, String doAsUser)
         throws FalconCLIException {
 
         return sendInstanceRequest(Instances.LISTING, type, entity, start, end, null,
-            null, colo, null, "", "", "", 0, null).getEntity(FeedInstanceResult.class);
+                null, colo, null, "", "", "", 0, null, doAsUser).getEntity(FeedInstanceResult.class);
     }
 
     public InstancesResult killInstances(String type, String entity, String start,
                                 String end, String colo, String clusters,
-                                String sourceClusters, List<LifeCycle> lifeCycles)
+                                String sourceClusters, List<LifeCycle> lifeCycles,
+                                String doAsUser)
         throws FalconCLIException, UnsupportedEncodingException {
 
         return sendInstanceRequest(Instances.KILL, type, entity, start, end,
-                getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles);
+                getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles, doAsUser);
     }
 
     public InstancesResult suspendInstances(String type, String entity, String start,
                                    String end, String colo, String clusters,
-                                   String sourceClusters, List<LifeCycle> lifeCycles)
+                                   String sourceClusters, List<LifeCycle> lifeCycles,
+                                   String doAsUser)
         throws FalconCLIException, UnsupportedEncodingException {
 
         return sendInstanceRequest(Instances.SUSPEND, type, entity, start, end,
-                getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles);
+                getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles, doAsUser);
     }
 
     public InstancesResult resumeInstances(String type, String entity, String start,
                                   String end, String colo, String clusters,
-                                  String sourceClusters, List<LifeCycle> lifeCycles)
+                                  String sourceClusters, List<LifeCycle> lifeCycles,
+                                  String doAsUser)
         throws FalconCLIException, UnsupportedEncodingException {
 
         return sendInstanceRequest(Instances.RESUME, type, entity, start, end,
-                getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles);
+                getServletInputStream(clusters, sourceClusters, null), null, colo, lifeCycles, doAsUser);
     }
 
     public InstancesResult rerunInstances(String type, String entity, String start,
                                  String end, String filePath, String colo,
                                  String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
-                                 Boolean isForced)
+                                 Boolean isForced, String doAsUser)
         throws FalconCLIException, IOException {
 
         StringBuilder buffer = new StringBuilder();
@@ -508,55 +522,69 @@ public class FalconClient extends AbstractFalconClient {
         }
         String temp = (buffer.length() == 0) ? null : buffer.toString();
         return sendInstanceRequest(Instances.RERUN, type, entity, start, end,
-                getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles, isForced);
+                getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles, isForced, doAsUser);
     }
 
     public InstancesResult getLogsOfInstances(String type, String entity, String start,
                                      String end, String colo, String runId,
                                      List<LifeCycle> lifeCycles, String filterBy,
-                                     String orderBy, String sortOrder, Integer offset, Integer numResults)
+                                     String orderBy, String sortOrder, Integer offset,
+                                     Integer numResults, String doAsUser)
         throws FalconCLIException {
 
         return sendInstanceRequest(Instances.LOG, type, entity, start, end,
-                null, runId, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults)
+                null, runId, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, doAsUser)
                 .getEntity(InstancesResult.class);
     }
 
     public InstancesResult getParamsOfInstance(String type, String entity,
                                       String start, String colo,
-                                      List<LifeCycle> lifeCycles)
+                                      List<LifeCycle> lifeCycles,
+                                      String doAsUser)
         throws FalconCLIException, UnsupportedEncodingException {
 
         return sendInstanceRequest(Instances.PARAMS, type, entity,
-                start, null, null, null, colo, lifeCycles);
+                start, null, null, null, colo, lifeCycles, doAsUser);
     }
 
-    public String getThreadDump() throws FalconCLIException {
-        return sendAdminRequest(AdminOperations.STACK);
+    public String getThreadDump(String doAsUser) throws FalconCLIException {
+        return sendAdminRequest(AdminOperations.STACK, doAsUser);
     }
 
-    public String getVersion() throws FalconCLIException {
-        return sendAdminRequest(AdminOperations.VERSION);
+    public String getVersion(String doAsUser) throws FalconCLIException {
+        return sendAdminRequest(AdminOperations.VERSION, doAsUser);
     }
 
-    public int getStatus() throws FalconCLIException {
+    public int getStatus(String doAsUser) throws FalconCLIException {
         AdminOperations job =  AdminOperations.VERSION;
-        ClientResponse clientResponse = service.path(job.path)
+
+        WebResource resource = service.path(job.path);
+
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
+
+        ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(job.mimeType).type(MediaType.TEXT_PLAIN)
                 .method(job.method, ClientResponse.class);
         return clientResponse.getStatus();
     }
 
-    public String getDimensionList(String dimensionType, String cluster) throws FalconCLIException {
-        return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster);
+    public String getDimensionList(String dimensionType, String cluster, String doAsUser) throws FalconCLIException {
+        return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster, doAsUser);
     }
 
-    public LineageGraphResult getEntityLineageGraph(String pipelineName) throws FalconCLIException {
+    public LineageGraphResult getEntityLineageGraph(String pipelineName, String doAsUser) throws FalconCLIException {
         MetadataOperations operation = MetadataOperations.LINEAGE;
+
         WebResource resource = service.path(operation.path)
                 .queryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName);
 
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
+
         ClientResponse clientResponse = resource
             .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
             .accept(operation.mimeType).type(operation.mimeType)
@@ -565,8 +593,9 @@ public class FalconClient extends AbstractFalconClient {
         return clientResponse.getEntity(LineageGraphResult.class);
     }
 
-    public String getDimensionRelations(String dimensionType, String dimensionName) throws FalconCLIException {
-        return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType, dimensionName, null);
+    public String getDimensionRelations(String dimensionType, String dimensionName,
+                                        String doAsUser) throws FalconCLIException {
+        return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType, dimensionName, null, doAsUser);
     }
 
     /**
@@ -611,7 +640,8 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     private APIResult sendEntityRequest(Entities entities, EntityType entityType,
-                                     String entityName, String colo, Boolean skipDryRun) throws FalconCLIException {
+                                     String entityName, String colo, Boolean skipDryRun,
+                                     String doAsUser) throws FalconCLIException {
 
         WebResource resource = service.path(entities.path)
                 .path(entityType.toString().toLowerCase()).path(entityName);
@@ -621,6 +651,9 @@ public class FalconClient extends AbstractFalconClient {
         if (null != skipDryRun) {
             resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
         }
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
 
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
@@ -637,7 +670,8 @@ public class FalconClient extends AbstractFalconClient {
                                             String start, String end, String runId, String colo,
                                             String fields, String nameSubsequence, String tagKeywords, String filterBy,
                                             String tags, String orderBy, String sortOrder, Integer offset,
-                                            Integer numResults, Integer numInstances, Boolean isForced) {
+                                            Integer numResults, Integer numInstances, Boolean isForced,
+                                            String doAsUser) {
         if (StringUtils.isNotEmpty(fields)) {
             resource = resource.queryParam("fields", fields);
         }
@@ -683,6 +717,10 @@ public class FalconClient extends AbstractFalconClient {
         if (isForced != null) {
             resource = resource.queryParam("force", String.valueOf(isForced));
         }
+
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
         return resource;
 
     }
@@ -691,7 +729,7 @@ public class FalconClient extends AbstractFalconClient {
                                             String start, String end,
                                             String fields, String filterBy, String filterTags,
                                             String orderBy, String sortOrder, Integer offset, Integer numResults,
-                                            Integer numInstances) throws FalconCLIException {
+                                            Integer numInstances, String doAsUser) throws FalconCLIException {
         WebResource resource = service.path(entities.path).path(entityType);
         if (StringUtils.isNotEmpty(cluster)) {
             resource = resource.queryParam("cluster", cluster);
@@ -700,7 +738,7 @@ public class FalconClient extends AbstractFalconClient {
         resource = addParamsToResource(resource, start, end, null, null,
                 fields, null, null, filterBy, filterTags,
                 orderBy, sortOrder,
-                offset, numResults, numInstances, null);
+                offset, numResults, numInstances, null, doAsUser);
 
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
@@ -713,10 +751,14 @@ public class FalconClient extends AbstractFalconClient {
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     private Entity sendDefinitionRequest(Entities entities, String entityType,
-                                         String entityName) throws FalconCLIException {
+                                         String entityName, String doAsUser) throws FalconCLIException {
 
-        ClientResponse clientResponse = service
-                .path(entities.path).path(entityType).path(entityName)
+        WebResource resource = service.path(entities.path).path(entityType).path(entityName);
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
+
+        ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
                 .method(entities.method, ClientResponse.class);
@@ -729,10 +771,13 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     private EntityList sendDependencyRequest(Entities entities, String entityType,
-                                         String entityName) throws FalconCLIException {
+                                         String entityName, String doAsUser) throws FalconCLIException {
 
-        ClientResponse clientResponse = service
-                .path(entities.path).path(entityType).path(entityName)
+        WebResource resource = service.path(entities.path).path(entityType).path(entityName);
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
+        ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
                 .method(entities.method, ClientResponse.class);
@@ -742,16 +787,22 @@ public class FalconClient extends AbstractFalconClient {
         return parseEntityList(clientResponse);
     }
 
-    private APIResult sendEntityRequestWithObject(Entities entities, String entityType, Object requestObject,
-                                                  String colo, Boolean skipDryRun) throws FalconCLIException {
+    private APIResult sendEntityRequestWithObject(Entities entities, String entityType,
+                                               Object requestObject, String colo,
+                                               Boolean skipDryRun, String doAsUser) throws FalconCLIException {
         WebResource resource = service.path(entities.path)
                 .path(entityType);
         if (colo != null) {
             resource = resource.queryParam("colo", colo);
         }
+
         if (null != skipDryRun) {
             resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
         }
+
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
@@ -763,10 +814,13 @@ public class FalconClient extends AbstractFalconClient {
         return clientResponse.getEntity(APIResult.class);
     }
 
-    public FeedLookupResult reverseLookUp(String type, String path) throws FalconCLIException {
+    public FeedLookupResult reverseLookUp(String type, String path, String doAsUser) throws FalconCLIException {
         Entities api = Entities.LOOKUP;
         WebResource resource = service.path(api.path).path(type);
         resource = resource.queryParam("path", path);
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
         ClientResponse response = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(api.mimeType)
                 .method(api.method, ClientResponse.class);
@@ -778,18 +832,18 @@ public class FalconClient extends AbstractFalconClient {
     private InstancesResult sendInstanceRequest(Instances instances, String type,
                                        String entity, String start, String end, InputStream props,
                                        String runid, String colo,
-                                       List<LifeCycle> lifeCycles) throws FalconCLIException {
+                                       List<LifeCycle> lifeCycles, String doAsUser) throws FalconCLIException {
         return sendInstanceRequest(instances, type, entity, start, end, props,
-                runid, colo, lifeCycles, "", "", "", 0, null)
+                runid, colo, lifeCycles, "", "", "", 0, null, doAsUser)
                 .getEntity(InstancesResult.class);
     }
 
     private InstancesResult sendInstanceRequest(Instances instances, String type,
                                                 String entity, String start, String end, InputStream props,
                                                 String runid, String colo, List<LifeCycle> lifeCycles,
-                                                Boolean isForced) throws FalconCLIException {
+                                                Boolean isForced, String doAsUser) throws FalconCLIException {
         return sendInstanceRequest(instances, type, entity, start, end, props,
-                runid, colo, lifeCycles, "", "", "", 0, null, isForced).getEntity(InstancesResult.class);
+                runid, colo, lifeCycles, "", "", "", 0, null, isForced, doAsUser).getEntity(InstancesResult.class);
     }
 
 
@@ -797,22 +851,23 @@ public class FalconClient extends AbstractFalconClient {
     private ClientResponse sendInstanceRequest(Instances instances, String type, String entity,
                                        String start, String end, InputStream props, String runid, String colo,
                                        List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder,
-                                       Integer offset, Integer numResults) throws FalconCLIException {
+                                       Integer offset, Integer numResults, String doAsUser) throws FalconCLIException {
 
         return sendInstanceRequest(instances, type, entity, start, end, props,
-                runid, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, null);
+                runid, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, null, doAsUser);
     }
 
     private ClientResponse sendInstanceRequest(Instances instances, String type, String entity,
                                        String start, String end, InputStream props, String runid, String colo,
-                                       List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder,
-                                       Integer offset, Integer numResults, Boolean isForced) throws FalconCLIException {
+                                       List<LifeCycle> lifeCycles, String filterBy, String orderBy,
+                                       String sortOrder, Integer offset, Integer numResults, Boolean isForced,
+                                       String doAsUser) throws FalconCLIException {
         checkType(type);
         WebResource resource = service.path(instances.path).path(type)
                 .path(entity);
 
         resource = addParamsToResource(resource, start, end, runid, colo,
-                null, null, null, filterBy, null, orderBy, sortOrder, offset, numResults, null, isForced);
+                null, null, null, filterBy, null, orderBy, sortOrder, offset, numResults, null, isForced, doAsUser);
 
         if (lifeCycles != null) {
             checkLifeCycleOption(lifeCycles, type);
@@ -881,11 +936,12 @@ public class FalconClient extends AbstractFalconClient {
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     private EntityList sendListRequest(Entities entities, String entityType, String fields, String nameSubsequence,
                                        String tagKeywords, String filterBy, String filterTags, String orderBy,
-                                       String sortOrder, Integer offset, Integer numResults) throws FalconCLIException {
+                                       String sortOrder, Integer offset, Integer numResults, String doAsUser
+                                       ) throws FalconCLIException {
         WebResource resource = service.path(entities.path)
                 .path(entityType);
         resource = addParamsToResource(resource, null, null, null, null, fields, nameSubsequence, tagKeywords,
-                filterBy, filterTags, orderBy, sortOrder, offset, numResults, null, null);
+                filterBy, filterTags, orderBy, sortOrder, offset, numResults, null, null, doAsUser);
 
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
@@ -898,8 +954,14 @@ public class FalconClient extends AbstractFalconClient {
     }
     // RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
-    private String sendAdminRequest(AdminOperations job) throws FalconCLIException {
-        ClientResponse clientResponse = service.path(job.path)
+    private String sendAdminRequest(AdminOperations job, String doAsUser) throws FalconCLIException {
+        WebResource resource = service.path(job.path);
+
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
+
+        ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(job.mimeType)
                 .type(job.mimeType)
@@ -910,7 +972,8 @@ public class FalconClient extends AbstractFalconClient {
     private String sendMetadataDiscoveryRequest(final MetadataOperations operation,
                                                 final String dimensionType,
                                                 final String dimensionName,
-                                                final String cluster) throws FalconCLIException {
+                                                final String cluster,
+                                                final String doAsUser) throws FalconCLIException {
         WebResource resource;
         switch (operation) {
         case LIST:
@@ -934,6 +997,10 @@ public class FalconClient extends AbstractFalconClient {
             resource = resource.queryParam(FalconMetadataCLI.CLUSTER_OPT, cluster);
         }
 
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
+
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(operation.mimeType).type(operation.mimeType)
@@ -960,24 +1027,27 @@ public class FalconClient extends AbstractFalconClient {
 
     }
 
-    public String getVertex(String id) throws FalconCLIException {
-        return sendMetadataLineageRequest(MetadataOperations.VERTICES, id);
+    public String getVertex(String id, String doAsUser) throws FalconCLIException {
+        return sendMetadataLineageRequest(MetadataOperations.VERTICES, id, doAsUser);
     }
 
-    public String getVertices(String key, String value) throws FalconCLIException {
-        return sendMetadataLineageRequest(MetadataOperations.VERTICES, key, value);
+    public String getVertices(String key, String value, String doAsUser) throws FalconCLIException {
+        return sendMetadataLineageRequest(MetadataOperations.VERTICES, key, value, doAsUser);
     }
 
-    public String getVertexEdges(String id, String direction) throws FalconCLIException {
-        return sendMetadataLineageRequestForEdges(MetadataOperations.VERTICES, id, direction);
+    public String getVertexEdges(String id, String direction, String doAsUser) throws FalconCLIException {
+        return sendMetadataLineageRequestForEdges(MetadataOperations.VERTICES, id, direction, doAsUser);
     }
 
-    public String getEdge(String id) throws FalconCLIException {
-        return sendMetadataLineageRequest(MetadataOperations.EDGES, id);
+    public String getEdge(String id, String doAsUser) throws FalconCLIException {
+        return sendMetadataLineageRequest(MetadataOperations.EDGES, id, doAsUser);
     }
 
-    public APIResult submitRecipe(String recipeName, String recipeToolClassName,
-                                  final String recipeOperation, Boolean skipDryRun) throws FalconCLIException {
+    public APIResult submitRecipe(String recipeName,
+                                  String recipeToolClassName,
+                                  final String recipeOperation,
+                                  Boolean skipDryRun,
+                                  final String doAsUser) throws FalconCLIException {
         String recipePath = clientProperties.getProperty("falcon.recipe.path");
 
         if (StringUtils.isEmpty(recipePath)) {
@@ -1023,16 +1093,21 @@ public class FalconClient extends AbstractFalconClient {
             } else {
                 RecipeTool.main(args);
             }
-            validate(EntityType.PROCESS.toString(), processFile, skipDryRun);
-            return submitAndSchedule(EntityType.PROCESS.toString(), processFile, skipDryRun);
+            validate(EntityType.PROCESS.toString(), processFile, skipDryRun, doAsUser);
+            return submitAndSchedule(EntityType.PROCESS.toString(), processFile, skipDryRun, doAsUser);
         } catch (Exception e) {
             throw new FalconCLIException(e.getMessage(), e);
         }
     }
 
-    private String sendMetadataLineageRequest(MetadataOperations job, String id) throws FalconCLIException {
-        ClientResponse clientResponse = service.path(job.path)
-                .path(id)
+    private String sendMetadataLineageRequest(MetadataOperations job, String id,
+                                              String doAsUser) throws FalconCLIException {
+        WebResource resource = service.path(job.path).path(id);
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
+
+        ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(job.mimeType)
                 .type(job.mimeType)
@@ -1041,9 +1116,13 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     private String sendMetadataLineageRequest(MetadataOperations job, String key,
-                                              String value) throws FalconCLIException {
-        ClientResponse clientResponse = service.path(job.path)
-                .queryParam("key", key)
+                                              String value, String doAsUser) throws FalconCLIException {
+        WebResource resource = service.path(job.path);
+
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
+        ClientResponse clientResponse = resource.queryParam("key", key)
                 .queryParam("value", value)
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(job.mimeType)
@@ -1053,10 +1132,15 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     private String sendMetadataLineageRequestForEdges(MetadataOperations job, String id,
-                                                      String direction) throws FalconCLIException {
-        ClientResponse clientResponse = service.path(job.path)
-                .path(id)
-                .path(direction)
+                                                      String direction, String doAsUser) throws FalconCLIException {
+        WebResource resource = service.path(job.path)
+                .path(id).path(direction);
+
+        if (StringUtils.isNotEmpty(doAsUser)) {
+            resource = resource.queryParam(FalconCLI.DO_AS_OPT, doAsUser);
+        }
+
+        ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(job.mimeType)
                 .type(job.mimeType)

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index 4aed5d7..e7c1594 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -19,6 +19,8 @@
 package org.apache.falcon.security;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.service.ProxyUserService;
+import org.apache.falcon.service.Services;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +68,40 @@ public final class CurrentUser {
     }
 
     /**
+     * Proxies doAs user.
+     *
+     * @param doAsUser doAs user
+     * @param proxyHost proxy host
+     * @throws IOException
+     */
+    public static void proxyDoAsUser(final String doAsUser,
+                                     final String proxyHost) throws IOException {
+        if (!isAuthenticated()) {
+            throw new IllegalStateException("Authentication not done");
+        }
+
+        String currentUser = CURRENT_USER.get().authenticatedUser;
+        if (StringUtils.isNotEmpty(doAsUser) && !doAsUser.equalsIgnoreCase(currentUser)) {
+            if (StringUtils.isEmpty(proxyHost)) {
+                throw new IllegalArgumentException("proxy host cannot be null or empty");
+            }
+            ProxyUserService proxyUserService = Services.get().getService(ProxyUserService.SERVICE_NAME);
+            try {
+                proxyUserService.validate(currentUser, proxyHost, doAsUser);
+            } catch (IOException ex) {
+                throw new RuntimeException(ex);
+            }
+
+            CurrentUser user = CURRENT_USER.get();
+            LOG.info("Authenticated user {} is proxying doAs user {} from host {}",
+                    user.authenticatedUser, doAsUser, proxyHost);
+            AUDIT.info("Authenticated user {} is proxying doAs user {} from host {}",
+                    user.authenticatedUser, doAsUser, proxyHost);
+            user.proxyUser = doAsUser;
+        }
+    }
+
+    /**
      * Captures the entity owner if authenticated user is a super user.
      *
      * @param aclOwner entity acl owner
@@ -80,9 +116,9 @@ public final class CurrentUser {
 
         CurrentUser user = CURRENT_USER.get();
         LOG.info("Authenticated user {} is proxying entity owner {}/{}",
-            user.authenticatedUser, aclOwner, aclGroup);
+                user.authenticatedUser, aclOwner, aclGroup);
         AUDIT.info("Authenticated user {} is proxying entity owner {}/{}",
-            user.authenticatedUser, aclOwner, aclGroup);
+                user.authenticatedUser, aclOwner, aclGroup);
         user.proxyUser = aclOwner;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
index 861f80f..123a1a2 100644
--- a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
+++ b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
@@ -18,12 +18,15 @@
 
 package org.apache.falcon.security;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -55,6 +58,7 @@ public final class SecurityUtil {
      */
     public static final String HIVE_METASTORE_PRINCIPAL = "hive.metastore.kerberos.principal";
 
+    private static final Logger LOG = LoggerFactory.getLogger(SecurityUtil.class);
 
     private SecurityUtil() {
     }
@@ -107,11 +111,19 @@ public final class SecurityUtil {
         return ReflectionUtils.getInstanceByClassName(providerClassName);
     }
 
-    public static void tryProxy(Entity entity) throws IOException, FalconException {
+    public static void tryProxy(Entity entity, final String doAsUser) throws IOException, FalconException {
         if (entity != null && entity.getACL() != null && SecurityUtil.isAuthorizationEnabled()) {
             final String aclOwner = entity.getACL().getOwner();
             final String aclGroup = entity.getACL().getGroup();
 
+            if (StringUtils.isNotEmpty(doAsUser)) {
+                if (!doAsUser.equalsIgnoreCase(aclOwner)) {
+                    LOG.warn("doAs user {} not same as acl owner {}. Ignoring acl owner.", doAsUser, aclOwner);
+                    throw new FalconException("doAs user and ACL owner mismatch. doAs user " + doAsUser
+                            +  " should be same as ACL owner " + aclOwner);
+                }
+                return;
+            }
             if (SecurityUtil.getAuthorizationProvider().shouldProxy(
                     CurrentUser.getAuthenticatedUGI(), aclOwner, aclGroup)) {
                 CurrentUser.proxy(aclOwner, aclGroup);

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/common/src/main/java/org/apache/falcon/service/GroupsService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/GroupsService.java b/common/src/main/java/org/apache/falcon/service/GroupsService.java
new file mode 100644
index 0000000..dd4d946
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/service/GroupsService.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Groups;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The GroupsService class delegates to the Hadoop's <code>org.apache.hadoop.security.Groups</code>
+ * to retrieve the groups a user belongs to.
+ */
+public class GroupsService implements FalconService {
+    private org.apache.hadoop.security.Groups hGroups;
+
+    public static final String SERVICE_NAME = GroupsService.class.getSimpleName();
+
+    /**
+     * Initializes the service.
+     */
+    @Override
+    public void init() {
+        hGroups = new Groups(new Configuration(true));
+    }
+
+    /**
+     * Destroys the service.
+     */
+    @Override
+    public void destroy() {
+    }
+
+    @Override
+    public String getName() {
+        return SERVICE_NAME;
+    }
+
+    /**
+     * Returns the list of groups a user belongs to.
+     *
+     * @param user user name.
+     * @return the groups the given user belongs to.
+     * @throws java.io.IOException thrown if there was an error retrieving the groups of the user.
+     */
+    public List<String> getGroups(String user) throws IOException {
+        return hGroups.getGroups(user);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/common/src/main/java/org/apache/falcon/service/ProxyUserService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/ProxyUserService.java b/common/src/main/java/org/apache/falcon/service/ProxyUserService.java
new file mode 100644
index 0000000..0ad6663
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/service/ProxyUserService.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.util.RuntimeProperties;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.AccessControlException;
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ProxyUserService checks if a user of a request has proxyuser privileges.
+ * <p>
+ * This check is based on the following criteria:
+ * <p>
+ * <ul>
+ *     <li>The user of the request must be configured as proxy user in Falcon runtime properties.</li>
+ *     <li>The user of the request must be making the request from a whitelisted host.</li>
+ *     <li>The user of the request must be making the request on behalf of a user of a whitelisted group.</li>
+ * </ul>
+ * <p>
+ */
+public class ProxyUserService implements FalconService {
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyUserService.class);
+
+
+    private Map<String, Set<String>> proxyUserHosts = new HashMap<>();
+    private Map<String, Set<String>> proxyUserGroups = new HashMap<>();
+
+    private static final String CONF_PREFIX = "falcon.service.ProxyUserService.proxyuser.";
+    private static final String GROUPS = ".groups";
+    private static final String HOSTS = ".hosts";
+    public static final String SERVICE_NAME = ProxyUserService.class.getSimpleName();
+
+    @Override
+    public String getName() {
+        return SERVICE_NAME;
+    }
+
+    /**
+     * Initializes the service.
+     * @throws FalconException thrown if the service could not be configured correctly.
+     */
+    @Override
+    public void init() throws FalconException {
+        Set<Map.Entry<Object, Object>> entrySet = RuntimeProperties.get().entrySet();
+
+        for (Map.Entry<Object, Object> entry : entrySet) {
+            String key = (String) entry.getKey();
+
+            if (key.startsWith(CONF_PREFIX) && key.endsWith(GROUPS)) {
+                String proxyUser = key.substring(0, key.lastIndexOf(GROUPS));
+                if (RuntimeProperties.get().getProperty(proxyUser + HOSTS) == null) {
+                    throw new FalconException(CONF_PREFIX + proxyUser + HOSTS + " property not set in runtime "
+                            + "properties. Please add it.");
+                }
+                proxyUser = proxyUser.substring(CONF_PREFIX.length());
+                String value = ((String) entry.getValue()).trim();
+                LOG.info("Loading proxyuser settings [{}]=[{}]", key, value);
+                Set<String> values = null;
+                if (!value.equals("*")) {
+                    values = new HashSet<>(Arrays.asList(value.split(",")));
+                }
+                proxyUserGroups.put(proxyUser, values);
+            }
+            if (key.startsWith(CONF_PREFIX) && key.endsWith(HOSTS)) {
+                String proxyUser = key.substring(0, key.lastIndexOf(HOSTS));
+                if (RuntimeProperties.get().getProperty(proxyUser + GROUPS) == null) {
+                    throw new FalconException(CONF_PREFIX + proxyUser + GROUPS + " property not set in runtime "
+                            + "properties. Please add it.");
+                }
+                proxyUser = proxyUser.substring(CONF_PREFIX.length());
+                String value = ((String) entry.getValue()).trim();
+                LOG.info("Loading proxyuser settings [{}]=[{}]", key, value);
+                Set<String> values = null;
+                if (!value.equals("*")) {
+                    String[] hosts = value.split(",");
+                    for (int i = 0; i < hosts.length; i++) {
+                        String hostName = hosts[i];
+                        try {
+                            hosts[i] = normalizeHostname(hostName);
+                        } catch (Exception ex) {
+                            throw new FalconException("Exception normalizing host name: " + hostName + "."
+                                    + ex.getMessage(), ex);
+                        }
+                        LOG.info("Hostname, original [{}], normalized [{}]", hostName, hosts[i]);
+                    }
+                    values = new HashSet<>(Arrays.asList(hosts));
+                }
+                proxyUserHosts.put(proxyUser, values);
+            }
+        }
+    }
+
+    /**
+     * Verifies a proxyuser.
+     *
+     * @param proxyUser user name of the proxy user.
+     * @param proxyHost host the proxy user is making the request from.
+     * @param doAsUser user the proxy user is impersonating.
+     * @throws java.io.IOException thrown if an error during the validation has occurred.
+     * @throws java.security.AccessControlException thrown if the user is not allowed to perform the proxyuser request.
+     */
+    public void validate(String proxyUser, String proxyHost, String doAsUser) throws IOException {
+        validateNotEmpty(proxyUser, "proxyUser",
+                "If you're attempting to use user-impersonation via a proxy user, please make sure that "
+                        + "falcon.service.ProxyUserService.proxyuser.#USER#.hosts and "
+                        + "falcon.service.ProxyUserService.proxyuser.#USER#.groups are configured correctly"
+        );
+        validateNotEmpty(proxyHost, "proxyHost",
+                "If you're attempting to use user-impersonation via a proxy user, please make sure that "
+                        + "falcon.service.ProxyUserService.proxyuser." + proxyUser + ".hosts and "
+                        + "falcon.service.ProxyUserService.proxyuser." + proxyUser + ".groups are configured correctly"
+        );
+        validateNotEmpty(doAsUser, "doAsUser", null);
+        LOG.debug("Authorization check proxyuser [{}] host [{}] doAs [{}]",
+                proxyUser, proxyHost, doAsUser);
+        if (proxyUserHosts.containsKey(proxyUser)) {
+            validateRequestorHost(proxyUser, proxyHost, proxyUserHosts.get(proxyUser));
+            validateGroup(proxyUser, doAsUser, proxyUserGroups.get(proxyUser));
+        } else {
+            throw new AccessControlException(MessageFormat.format("User [{0}] not defined as proxyuser. Please add it"
+                            + " to runtime properties.", proxyUser));
+        }
+    }
+
+    private void validateRequestorHost(String proxyUser, String hostname, Set<String> validHosts)
+        throws IOException {
+        if (validHosts != null) {
+            if (!validHosts.contains(hostname) && !validHosts.contains(normalizeHostname(hostname))) {
+                throw new AccessControlException(MessageFormat.format("Unauthorized host [{0}] for proxyuser [{1}]",
+                        hostname, proxyUser));
+            }
+        }
+    }
+
+    private void validateGroup(String proxyUser, String user, Set<String> validGroups) throws IOException {
+        if (validGroups != null) {
+            List<String> userGroups =  Services.get().<GroupsService>getService(GroupsService.SERVICE_NAME)
+            .getGroups(user);
+            for (String g : validGroups) {
+                if (userGroups.contains(g)) {
+                    return;
+                }
+            }
+            throw new AccessControlException(
+                    MessageFormat.format("Unauthorized proxyuser [{0}] for user [{1}], not in proxyuser groups",
+                            proxyUser, user));
+        }
+    }
+
+    private String normalizeHostname(String name) {
+        try {
+            InetAddress address = InetAddress.getByName(name);
+            return address.getCanonicalHostName();
+        }  catch (IOException ex) {
+            throw new AccessControlException(MessageFormat.format("Could not resolve host [{0}], [{1}]", name,
+                    ex.getMessage()));
+        }
+    }
+
+    private static void validateNotEmpty(String str, String name, String info) {
+        if (StringUtils.isBlank(str)) {
+            throw new IllegalArgumentException(name + " cannot be null or empty" + (info == null ? "" : ", " + info));
+        }
+    }
+
+    /**
+     * Destroys the service.
+     */
+    @Override
+    public void destroy() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
index 23ecc16..f499dd9 100644
--- a/common/src/main/resources/runtime.properties
+++ b/common/src/main/resources/runtime.properties
@@ -31,4 +31,20 @@
 *.feed.late.policy=exp-backoff
 
 # If true, Falcon skips oozie dryrun while scheduling entities.
-*.falcon.skip.dryrun=false
\ No newline at end of file
+*.falcon.skip.dryrun=false
+
+######### Proxyuser Configuration Start #########
+
+#List of hosts the '#USER#' user is allowed to perform 'doAs 'operations from. The '#USER#' must be replaced with the
+#username of the user who is allowed to perform 'doAs' operations. The value can be the '*' wildcard or a list of
+#comma separated hostnames
+
+*.falcon.service.ProxyUserService.proxyuser.#USER#.hosts=*
+
+#List of groups the '#USER#' user is allowed to 'doAs 'operations. The '#USER#' must be replaced with the
+#username of the user who is allowed to perform 'doAs' operations. The value can be the '*' wildcard or a list of
+#comma separated groups
+
+*.falcon.service.ProxyUserService.proxyuser.#USER#.groups=*
+
+######### Proxyuser Configuration End #########
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/d8fbec9f/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index c48188c..0593b96 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -37,7 +37,9 @@
                         org.apache.falcon.rerun.service.RetryService,\
                         org.apache.falcon.rerun.service.LateRunService,\
                         org.apache.falcon.metadata.MetadataMappingService,\
-                        org.apache.falcon.service.LogCleanupService
+                        org.apache.falcon.service.LogCleanupService,\
+                        org.apache.falcon.service.GroupsService,\
+                        org.apache.falcon.service.ProxyUserService
 
 ##### Falcon Configuration Store Change listeners #####
 *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\


Mime
View raw message