falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [3/5] FALCON-11 Add support for security in Falcon. Contributed by Venkatesh Seetharam
Date Sun, 16 Feb 2014 03:31:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
index 48d4589..0c338da 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.resource.InstancesResult.Instance;
 import org.apache.falcon.resource.InstancesResult.InstanceAction;
 import org.apache.hadoop.conf.Configuration;
@@ -48,15 +49,12 @@ public final class LogProvider {
 
         Cluster clusterObj = ConfigurationStore.get().get(
                 EntityType.CLUSTER, instance.cluster);
-        String resolvedRunId = "-";
         try {
-            FileSystem fs = FileSystem.get(
-                    new Path(ClusterHelper.getStorageUrl(clusterObj)).toUri(),
-                    new Configuration());
-            resolvedRunId = getResolvedRunId(fs, clusterObj, entity, instance,
-                    runId);
-            // if runId param is not resolved, i.e job is killed or not started
-            // or running
+            Configuration conf = ClusterHelper.getConfiguration(clusterObj);
+            // fs on behalf of the end user.
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+            String resolvedRunId = getResolvedRunId(fs, clusterObj, entity, instance, runId);
+            // if runId param is not resolved, i.e job is killed or not started or running
             if (resolvedRunId.equals("-")
                     && StringUtils.isEmpty(instance.logFile)) {
                 instance.logFile = "-";
@@ -107,13 +105,13 @@ public final class LogProvider {
     }
 
     private Instance populateActionLogUrls(FileSystem fs, Cluster cluster,
-                                           Entity entity, Instance instance, String formatedRunId)
+                                           Entity entity, Instance instance, String formattedRunId)
         throws FalconException, OozieClientException, IOException {
 
         Path actionPaths = new Path(ClusterHelper.getStorageUrl(cluster),
                 EntityUtil.getLogPath(cluster, entity) + "/job-"
                         + EntityUtil.fromUTCtoURIDate(instance.instance) + "/"
-                        + formatedRunId + "/*");
+                        + formattedRunId + "/*");
         FileStatus[] actions = fs.globStatus(actionPaths);
         InstanceAction[] instanceActions = new InstanceAction[actions.length - 1];
         instance.actions = instanceActions;
@@ -124,7 +122,7 @@ public final class LogProvider {
                     ClusterHelper.getStorageUrl(cluster),
                     EntityUtil.getLogPath(cluster, entity) + "/job-"
                             + EntityUtil.fromUTCtoURIDate(instance.instance) + "/"
-                            + formatedRunId, file.getPath().getName());
+                            + formattedRunId, file.getPath().getName());
             if (filePath.getName().equals("oozie.log")) {
                 instance.logFile = dfsBrowserUrl;
                 continue;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index 37f8cfa..63c16ad 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -94,14 +95,13 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
         }
 
         LOG.debug("Copying libs from " + src);
-        Configuration conf = ClusterHelper.getConfiguration(cluster);
-        conf.setInt("ipc.client.connect.max.retries", 10);
-        FileSystem fs = null;
+        FileSystem fs;
         try {
-            fs = FileSystem.get(conf);
+            fs = getFileSystem(cluster);
+            fs.getConf().set("dfs.umaskmode", "022");  // drwxr-xr-x
         } catch (Exception e) {
             throw new FalconException("Unable to connect to HDFS: "
-                    + ClusterHelper.getStorageUrl(cluster));
+                    + ClusterHelper.getStorageUrl(cluster), e);
         }
         if (!fs.exists(target)) {
             fs.mkdirs(target);
@@ -137,6 +137,15 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
         }
     }
 
+    // the dir is owned by Falcon but world-readable
+    private static FileSystem getFileSystem(Cluster cluster)
+        throws FalconException, IOException {
+        Configuration conf = ClusterHelper.getConfiguration(cluster);
+        conf.setInt("ipc.client.connect.max.retries", 10);
+
+        return HadoopClientFactory.get().createFileSystem(conf);
+    }
+
     @Override
     public void onAdd(Entity entity, boolean ignoreFailure) throws FalconException {
         if (entity.getEntityType() != EntityType.CLUSTER) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 3f9256c..c19c89c 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -23,6 +23,7 @@ import org.apache.falcon.logging.LogMover;
 import org.apache.falcon.messaging.MessageProducer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
@@ -61,7 +62,8 @@ public class FalconPostProcessing extends Configured implements Tool {
         WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie"),
         USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
         USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"),
-        LOG_DIR("logDir", "log dir where job logs are copied");
+        LOG_DIR("logDir", "log dir where job logs are copied"),
+        WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)");
 
         private String name;
         private String description;
@@ -96,7 +98,7 @@ public class FalconPostProcessing extends Configured implements Tool {
         LOG.info("Sending user message " + cmd);
         invokeUserMessageProducer(cmd);
 
-        //LogMover doesnt throw exception, a failed logmover will not fail the user workflow
+        //LogMover doesn't throw exception, a failed log mover will not fail the user workflow
         LOG.info("Moving logs " + cmd);
         invokeLogProducer(cmd);
 
@@ -155,11 +157,18 @@ public class FalconPostProcessing extends Configured implements Tool {
         addArg(args, cmd, Arg.FEED_NAMES);
         addArg(args, cmd, Arg.FEED_INSTANCE_PATHS);
         addArg(args, cmd, Arg.LOG_FILE);
+        addArg(args, cmd, Arg.WORKFLOW_USER);
 
         MessageProducer.main(args.toArray(new String[0]));
     }
 
     private void invokeLogProducer(CommandLine cmd) throws Exception {
+        // todo: need to move this out to Falcon in-process
+        if (UserGroupInformation.isSecurityEnabled()) {
+            LOG.info("Unable to move logs as security is enabled.");
+            return;
+        }
+
         List<String> args = new ArrayList<String>();
         addArg(args, cmd, Arg.WF_ENGINE_URL);
         addArg(args, cmd, Arg.ENTITY_TYPE);
@@ -204,6 +213,8 @@ public class FalconPostProcessing extends Configured implements Tool {
         addOption(options, Arg.USER_SUBFLOW_ID);
         addOption(options, Arg.USER_WORKFLOW_ENGINE, false);
         addOption(options, Arg.LOG_DIR);
+        addOption(options, Arg.WORKFLOW_USER);
+
         return new GnuParser().parse(options, arguments);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
index b757531..d6dd2af 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
@@ -24,8 +24,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.log4j.Logger;
-import org.apache.oozie.client.CustomOozieClient;
-import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.ProxyOozieClient;
 
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -35,42 +34,43 @@ import java.util.concurrent.ConcurrentHashMap;
 public final class OozieClientFactory {
 
     private static final Logger LOG = Logger.getLogger(OozieClientFactory.class);
-
-    private static final ConcurrentHashMap<String, OozieClient> CACHE =
-        new ConcurrentHashMap<String, OozieClient>();
     private static final String LOCAL_OOZIE = "local";
+
+    private static final ConcurrentHashMap<String, ProxyOozieClient> CACHE =
+            new ConcurrentHashMap<String, ProxyOozieClient>();
     private static volatile boolean localInitialized = false;
 
     private OozieClientFactory() {}
 
-    public static synchronized OozieClient get(Cluster cluster)
+    public static synchronized ProxyOozieClient get(Cluster cluster)
         throws FalconException {
 
         assert cluster != null : "Cluster cant be null";
         String oozieUrl = ClusterHelper.getOozieUrl(cluster);
         if (!CACHE.containsKey(oozieUrl)) {
-            OozieClient ref = getClientRef(oozieUrl);
+            ProxyOozieClient ref = getClientRef(oozieUrl);
             LOG.info("Caching Oozie client object for " + oozieUrl);
             CACHE.putIfAbsent(oozieUrl, ref);
         }
+
         return CACHE.get(oozieUrl);
     }
 
-    public static OozieClient get(String cluster) throws FalconException {
-        return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, cluster));
+    public static ProxyOozieClient get(String clusterName) throws FalconException {
+        return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, clusterName));
     }
 
-    private static OozieClient getClientRef(String oozieUrl)
+    private static ProxyOozieClient getClientRef(String oozieUrl)
         throws FalconException {
 
         if (LOCAL_OOZIE.equals(oozieUrl)) {
             return getLocalOozieClient();
         } else {
-            return new CustomOozieClient(oozieUrl);
+            return new ProxyOozieClient(oozieUrl);
         }
     }
 
-    private static OozieClient getLocalOozieClient() throws FalconException {
+    private static ProxyOozieClient getLocalOozieClient() throws FalconException {
         try {
             if (!localInitialized) {
                 //LocalOozie.start();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
index 068e980..a930bb7 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,7 +57,7 @@ public class OozieHouseKeepingService implements WorkflowEngineActionListener {
             LOG.info("Deleting entity path " + entityPath + " on cluster " + clusterName);
 
             Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = FileSystem.get(conf);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
             if (fs.exists(entityPath) && !fs.delete(entityPath, true)) {
                 throw new FalconException("Unable to cleanup entity path: " + entityPath);
             }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 71ff430..cea73bd 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -27,6 +27,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.*;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesResult.Instance;
@@ -54,8 +55,7 @@ import java.util.Map.Entry;
  */
 public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
-    private static final Logger LOG = Logger
-            .getLogger(OozieWorkflowEngine.class);
+    private static final Logger LOG = Logger.getLogger(OozieWorkflowEngine.class);
 
     public static final String ENGINE = "oozie";
     private static final BundleJob MISSING = new NullBundleJob();
@@ -130,8 +130,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private void commitStagingPath(String cluster, String path) throws FalconException {
         path = StringUtils.removeStart(path, "${nameNode}");
-        FileSystem fs =
-                ClusterHelper.getFileSystem((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, cluster));
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(clusterEntity));
         try {
             fs.create(new Path(path, EntityUtil.SUCCEEDED_FILE_NAME)).close();
         } catch (IOException e) {
@@ -187,10 +187,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     //Return all bundles for the entity in the requested cluster
-    private List<BundleJob> findBundles(Entity entity, String cluster) throws FalconException {
+    private List<BundleJob> findBundles(Entity entity, String clusterName) throws FalconException {
+
         try {
-            OozieClient client = OozieClientFactory.get(cluster);
-            List<BundleJob> jobs = client.getBundleJobsInfo(
+            List<BundleJob> jobs = OozieClientFactory.get(clusterName).getBundleJobsInfo(
                     OozieClient.FILTER_NAME + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
             if (jobs != null) {
                 List<BundleJob> filteredJobs = new ArrayList<BundleJob>();
@@ -317,22 +317,23 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return "SUCCESS";
     }
 
-    private void killBundle(String cluster, BundleJob job) throws FalconException {
-        OozieClient client = OozieClientFactory.get(cluster);
+    private void killBundle(String clusterName, BundleJob job) throws FalconException {
+        ProxyOozieClient client = OozieClientFactory.get(clusterName);
         try {
             //kill all coords
             for (CoordinatorJob coord : job.getCoordinators()) {
                 client.kill(coord.getId());
-                LOG.debug("Killed coord " + coord.getId() + " on cluster " + cluster);
+                LOG.debug("Killed coord " + coord.getId() + " on cluster " + clusterName);
             }
 
             //set end time of bundle
-            client.change(job.getId(), OozieClient.CHANGE_VALUE_ENDTIME + "=" + SchemaHelper.formatDateUTC(new Date()));
-            LOG.debug("Changed end time of bundle " + job.getId() + " on cluster " + cluster);
+            client.change(job.getId(),
+                    OozieClient.CHANGE_VALUE_ENDTIME + "=" + SchemaHelper.formatDateUTC(new Date()));
+            LOG.debug("Changed end time of bundle " + job.getId() + " on cluster " + clusterName);
 
             //kill bundle
             client.kill(job.getId());
-            LOG.debug("Killed bundle " + job.getId() + " on cluster " + cluster);
+            LOG.debug("Killed bundle " + job.getId() + " on cluster " + clusterName);
         } catch (OozieClientException e) {
             throw new FalconException(e);
         }
@@ -383,7 +384,6 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public InstancesResult getRunningInstances(Entity entity)
         throws FalconException {
-
         try {
             WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(
                     ENGINE, entity);
@@ -400,7 +400,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             }
 
             for (String cluster : clusters) {
-                OozieClient client = OozieClientFactory.get(cluster);
+                ProxyOozieClient client = OozieClientFactory.get(cluster);
                 List<WorkflowJob> wfs = getRunningWorkflows(cluster, coordNames);
                 if (wfs != null) {
                     for (WorkflowJob job : wfs) {
@@ -476,10 +476,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private WorkflowJob getWorkflowInfo(String cluster, String wfId)
         throws FalconException {
-
-        OozieClient client = OozieClientFactory.get(cluster);
         try {
-            return client.getJobInfo(wfId);
+            return OozieClientFactory.get(cluster).getJobInfo(wfId);
         } catch (OozieClientException e) {
             throw new FalconException(e);
         }
@@ -556,7 +554,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             }
 
             List<BundleJob> bundles = entry.getValue();
-            OozieClient client = OozieClientFactory.get(cluster);
+            ProxyOozieClient client = OozieClientFactory.get(cluster);
             List<CoordinatorJob> applicableCoords = getApplicableCoords(entity, client, start, end, bundles);
             long unscheduledInstances = 0;
             boolean isLastCoord = false;
@@ -725,18 +723,14 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                  + Arrays.toString(statuses));
     }
 
-    private String getSourceCluster(String cluster,
-                                    CoordinatorAction coordinatorAction, Entity entity)
+    private String getSourceCluster(String cluster, CoordinatorAction coordinatorAction, Entity entity)
         throws FalconException {
-
-        OozieClient client = OozieClientFactory.get(cluster);
-        CoordinatorJob coordJob;
         try {
-            coordJob = client.getCoordJobInfo(coordinatorAction.getJobId());
+            CoordinatorJob coordJob = OozieClientFactory.get(cluster).getCoordJobInfo(coordinatorAction.getJobId());
+            return EntityUtil.getWorkflowNameSuffix(coordJob.getAppName(), entity);
         } catch (OozieClientException e) {
             throw new FalconException("Unable to get oozie job id:" + e);
         }
-        return EntityUtil.getWorkflowNameSuffix(coordJob.getAppName(), entity);
     }
 
     private List<String> getIncludedClusters(Properties props,
@@ -776,7 +770,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         for (Map.Entry<String, List<BundleJob>> entry : bundlesMap.entrySet()) {
             String cluster = entry.getKey();
             List<BundleJob> bundles = entry.getValue();
-            OozieClient client = OozieClientFactory.get(cluster);
+            ProxyOozieClient client = OozieClientFactory.get(cluster);
             List<CoordinatorJob> applicableCoords = getApplicableCoords(entity, client, start, end, bundles);
             List<CoordinatorAction> actions = new ArrayList<CoordinatorAction>();
 
@@ -836,8 +830,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private List<CoordinatorJob> getApplicableCoords(Entity entity,
-                                                     OozieClient client, Date start, Date end, List<BundleJob> bundles)
+    private List<CoordinatorJob> getApplicableCoords(Entity entity, ProxyOozieClient client,
+                                                     Date start, Date end, List<BundleJob> bundles)
         throws FalconException {
 
         List<CoordinatorJob> applicableCoords = new ArrayList<CoordinatorJob>();
@@ -1113,10 +1107,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private BundleJob getBundleInfo(String cluster, String bundleId)
         throws FalconException {
-
-        OozieClient client = OozieClientFactory.get(cluster);
         try {
-            return client.getBundleJobInfo(bundleId);
+            return OozieClientFactory.get(cluster).getBundleJobInfo(bundleId);
         } catch (OozieClientException e) {
             throw new FalconException(e);
         }
@@ -1132,9 +1124,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                     .append(wfName);
         }
 
-        OozieClient client = OozieClientFactory.get(cluster);
         try {
-            return client.getJobsInfo(filter.toString(), 1, 1000);
+            return OozieClientFactory.get(cluster).getJobsInfo(filter.toString(), 1, 1000);
         } catch (OozieClientException e) {
             throw new FalconException(e);
         }
@@ -1144,7 +1135,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     public void reRun(String cluster, String jobId, Properties props)
         throws FalconException {
 
-        OozieClient client = OozieClientFactory.get(cluster);
+        ProxyOozieClient client = OozieClientFactory.get(cluster);
         try {
             WorkflowJob jobInfo = client.getJobInfo(jobId);
             Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
@@ -1200,7 +1191,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     public String getWorkflowStatus(String cluster, String jobId)
         throws FalconException {
 
-        OozieClient client = OozieClientFactory.get(cluster);
+        ProxyOozieClient client = OozieClientFactory.get(cluster);
         try {
             if (jobId.endsWith("-W")) {
                 WorkflowJob jobInfo = client.getJobInfo(jobId);
@@ -1231,9 +1222,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private String run(String cluster, Properties props) throws FalconException {
-        OozieClient client = OozieClientFactory.get(cluster);
         try {
-            String jobId = client.run(props);
+            String jobId = OozieClientFactory.get(cluster).run(props);
             LOG.info("Submitted " + jobId + " on cluster " + cluster
                     + " with properties : " + props);
             return jobId;
@@ -1244,9 +1234,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private void suspend(String cluster, String jobId) throws FalconException {
-        OozieClient client = OozieClientFactory.get(cluster);
         try {
-            client.suspend(jobId);
+            OozieClientFactory.get(cluster).suspend(jobId);
             assertStatus(cluster, jobId, Status.PREPSUSPENDED, Status.SUSPENDED, Status.SUCCEEDED,
                     Status.FAILED, Status.KILLED);
             LOG.info("Suspended job " + jobId + " on cluster " + cluster);
@@ -1256,9 +1245,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private void resume(String cluster, String jobId) throws FalconException {
-        OozieClient client = OozieClientFactory.get(cluster);
         try {
-            client.resume(jobId);
+            OozieClientFactory.get(cluster).resume(jobId);
             assertStatus(cluster, jobId, Status.RUNNING, Status.SUCCEEDED,
                     Status.FAILED, Status.KILLED);
             LOG.info("Resumed job " + jobId + " on cluster " + cluster);
@@ -1268,9 +1256,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private void kill(String cluster, String jobId) throws FalconException {
-        OozieClient client = OozieClientFactory.get(cluster);
         try {
-            client.kill(jobId);
+            OozieClientFactory.get(cluster).kill(jobId);
             assertStatus(cluster, jobId, Status.KILLED, Status.SUCCEEDED,
                     Status.FAILED);
             LOG.info("Killed job " + jobId + " on cluster " + cluster);
@@ -1281,10 +1268,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private void change(String cluster, String jobId, String changeValue)
         throws FalconException {
-
         try {
-            OozieClient client = OozieClientFactory.get(cluster);
-            client.change(jobId, changeValue);
+            OozieClientFactory.get(cluster).change(jobId, changeValue);
             LOG.info("Changed bundle/coord " + jobId + ": " + changeValue
                     + " on cluster " + cluster);
         } catch (OozieClientException e) {
@@ -1317,7 +1302,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
         // assert that its really changed
         try {
-            OozieClient client = OozieClientFactory.get(cluster);
+            ProxyOozieClient client = OozieClientFactory.get(cluster);
             CoordinatorJob coord = client.getCoordJobInfo(id);
             for (int counter = 0; counter < 3; counter++) {
                 Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null
@@ -1348,9 +1333,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     @Override
     public Properties getWorkflowProperties(String cluster, String jobId) throws FalconException {
-        OozieClient client = OozieClientFactory.get(cluster);
         try {
-            WorkflowJob jobInfo = client.getJobInfo(jobId);
+            WorkflowJob jobInfo = OozieClientFactory.get(cluster).getJobInfo(jobId);
             String conf = jobInfo.getConf();
             return OozieUtils.toProperties(conf);
         } catch (Exception e) {
@@ -1361,12 +1345,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public InstancesResult getJobDetails(String cluster, String jobId)
         throws FalconException {
-
-        OozieClient client = OozieClientFactory.get(cluster);
         Instance[] instances = new Instance[1];
         Instance instance = new Instance();
         try {
-            WorkflowJob jobInfo = client.getJobInfo(jobId);
+            WorkflowJob jobInfo = OozieClientFactory.get(cluster).getJobInfo(jobId);
             instance.startTime = jobInfo.getStartTime();
             if (jobInfo.getStatus().name().equals(Status.RUNNING.name())) {
                 instance.endTime = new Date();
@@ -1380,6 +1362,5 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         } catch (Exception e) {
             throw new FalconException(e);
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
deleted file mode 100644
index c55221e..0000000
--- a/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.client;
-
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.rest.RestConstants;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Wrapped Oozie Client.
- */
-public class CustomOozieClient extends OozieClient {
-
-    private static final Logger LOG = Logger.getLogger(CustomOozieClient.class);
-    private static final Map<String, String> NONE = new HashMap<String, String>();
-
-    public CustomOozieClient(String oozieUrl) {
-        super(oozieUrl);
-    }
-
-    public Properties getConfiguration() throws OozieClientException {
-        return (new OozieConfiguration(RestConstants.ADMIN_CONFIG_RESOURCE)).call();
-    }
-
-    public Properties getProperties() throws OozieClientException {
-        return (new OozieConfiguration(RestConstants.ADMIN_JAVA_SYS_PROPS_RESOURCE)).call();
-    }
-
-    @Override
-    protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException {
-        String strUrl = url.toString();
-        if (!strUrl.contains(OozieClient.USER_NAME)) { // decorate the url with the user in request
-            String paramSeparator = (strUrl.contains("?")) ? "&" : "?";
-            strUrl += paramSeparator + OozieClient.USER_NAME + "=" + CurrentUser.getUser();
-            url = new URL(strUrl);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Decorated url with user info: " + url);
-            }
-        }
-
-        HttpURLConnection conn = super.createConnection(url, method);
-
-        int connectTimeout = Integer.valueOf(RuntimeProperties.get().getProperty("oozie.connect.timeout", "1000"));
-        conn.setConnectTimeout(connectTimeout);
-
-        int readTimeout = Integer.valueOf(RuntimeProperties.get().getProperty("oozie.read.timeout", "45000"));
-        conn.setReadTimeout(readTimeout);
-
-        return conn;
-    }
-
-    private class OozieConfiguration extends ClientCallable<Properties> {
-
-        public OozieConfiguration(String resource) {
-            super("GET", RestConstants.ADMIN, resource, NONE);
-        }
-
-        @Override
-        protected Properties call(HttpURLConnection conn) throws IOException, OozieClientException {
-            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
-            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
-                Reader reader = new InputStreamReader(conn.getInputStream(), "UTF_8");
-                JSONObject json = (JSONObject) JSONValue.parse(reader);
-                Properties props = new Properties();
-                props.putAll(json);
-                return props;
-            } else {
-                handleError(conn);
-                return null;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
new file mode 100644
index 0000000..c78a83a
--- /dev/null
+++ b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java
@@ -0,0 +1,562 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.client;
+
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.hdfs.web.KerberosUgiAuthenticator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.rest.RestConstants;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.io.Reader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+
+/**
+ * Wrapped Oozie Client that does proxy the requests.
+ */
+public class ProxyOozieClient extends AuthOozieClient {
+
+    private static final Logger LOG = Logger.getLogger(ProxyOozieClient.class);
+    private static final Map<String, String> NONE = new HashMap<String, String>();
+
+    private final Authenticator authenticator = new KerberosUgiAuthenticator();
+
+    public ProxyOozieClient(String oozieUrl) {
+        super(oozieUrl, SecurityUtil.getAuthenticationType());
+
+        if (org.apache.log4j.Logger.getLogger(getClass()).isDebugEnabled()) {
+            setDebugMode(1);
+        }
+    }
+
+    public Properties getConfiguration() throws OozieClientException {
+        return (new OozieConfiguration(RestConstants.ADMIN_CONFIG_RESOURCE)).call();
+    }
+
+    public Properties getProperties() throws OozieClientException {
+        return (new OozieConfiguration(RestConstants.ADMIN_JAVA_SYS_PROPS_RESOURCE)).call();
+    }
+
+    @Override
+    protected Authenticator getAuthenticator() throws OozieClientException {
+        return authenticator;
+    }
+
+    @Override
+    protected HttpURLConnection createConnection(URL url, final String method)
+        throws IOException, OozieClientException {
+
+        final URL decoratedUrl = decorateUrlWithUser(url);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("ProxyOozieClient.createConnection: u=" + url + ", m=" + method);
+        }
+
+        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+        try {
+            return currentUser.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
+                public HttpURLConnection run() throws Exception {
+                    HttpURLConnection conn = ProxyOozieClient.super.createConnection(decoratedUrl, method);
+
+                    int connectTimeout = Integer.valueOf(
+                            RuntimeProperties.get().getProperty("oozie.connect.timeout", "1000"));
+                    conn.setConnectTimeout(connectTimeout);
+
+                    int readTimeout = Integer.valueOf(
+                            RuntimeProperties.get().getProperty("oozie.read.timeout", "45000"));
+                    conn.setReadTimeout(readTimeout);
+
+                    return conn;
+                }
+            });
+        } catch (InterruptedException e) {
+            throw new IOException("Could not connect to oozie: " + e.getMessage(), e);
+        }
+    }
+
+    protected URL decorateUrlWithUser(URL url) throws IOException {
+        String strUrl = url.toString();
+
+        if (!strUrl.contains(OozieClient.USER_NAME)) {
+            // decorate the url with the proxy user in request
+            String paramSeparator = (strUrl.contains("?")) ? "&" : "?";
+            strUrl += paramSeparator + OozieClient.USER_NAME + "="
+                    + UserGroupInformation.getLoginUser().getUserName();
+            // strUrl += "&" + RestConstants.DO_AS_PARAM + "=" + CurrentUser.getUser();
+
+            url = new URL(strUrl);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Decorated url with user info: " + url);
+            }
+        }
+
+        return url;
+    }
+
+    private class OozieConfiguration extends ClientCallable<Properties> {
+
+        public OozieConfiguration(String resource) {
+            super("GET", RestConstants.ADMIN, resource, NONE);
+        }
+
+        @Override
+        protected Properties call(HttpURLConnection conn)
+            throws IOException, OozieClientException {
+            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
+            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
+                Reader reader = new InputStreamReader(conn.getInputStream(), "UTF_8");
+                JSONObject json = (JSONObject) JSONValue.parse(reader);
+                Properties props = new Properties();
+                props.putAll(json);
+                return props;
+            } else {
+                handleError(conn);
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public SYSTEM_MODE getSystemMode() throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<SYSTEM_MODE>() {
+
+                public SYSTEM_MODE call() throws Exception {
+                    return ProxyOozieClient.super.getSystemMode();
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public String submit(final Properties conf) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<String>() {
+
+                public String call() throws Exception {
+                    return ProxyOozieClient.super.submit(conf);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public String dryrun(final Properties conf) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<String>() {
+
+                public String call() throws Exception {
+                    return ProxyOozieClient.super.dryrun(conf);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+
+    }
+
+    @Override
+    public void start(final String jobId) throws OozieClientException {
+        try {
+            doAs(CurrentUser.getUser(), new Callable<Object>() {
+
+                public String call() throws Exception {
+                    ProxyOozieClient.super.start(jobId);
+                    return null;
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public String run(final Properties conf) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<String>() {
+
+                public String call() throws Exception {
+                    return ProxyOozieClient.super.run(conf);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public void reRun(final String jobId, final Properties conf) throws OozieClientException {
+        try {
+            doAs(CurrentUser.getUser(), new Callable<Object>() {
+
+                public Object call() throws Exception {
+                    ProxyOozieClient.super.reRun(jobId, conf);
+                    return null;
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public void suspend(final String jobId) throws OozieClientException {
+        try {
+            doAs(CurrentUser.getUser(), new Callable<Object>() {
+
+                public Object call() throws Exception {
+                    ProxyOozieClient.super.suspend(jobId);
+                    return null;
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public void resume(final String jobId) throws OozieClientException {
+        try {
+            doAs(CurrentUser.getUser(), new Callable<Object>() {
+
+                public Object call() throws Exception {
+                    ProxyOozieClient.super.resume(jobId);
+                    return null;
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public void kill(final String jobId) throws OozieClientException {
+        try {
+            doAs(CurrentUser.getUser(), new Callable<Object>() {
+
+                public Object call() throws Exception {
+                    ProxyOozieClient.super.kill(jobId);
+                    return null;
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public void change(final String jobId, final String changeValue) throws OozieClientException {
+        try {
+            doAs(CurrentUser.getUser(), new Callable<Object>() {
+
+                public Object call() throws Exception {
+                    ProxyOozieClient.super.change(jobId, changeValue);
+                    return null;
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public WorkflowJob getJobInfo(final String jobId) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<WorkflowJob>() {
+
+                public WorkflowJob call() throws Exception {
+                    return ProxyOozieClient.super.getJobInfo(jobId);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public WorkflowJob getJobInfo(final String jobId, final int start, final int len)
+        throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<WorkflowJob>() {
+
+                public WorkflowJob call() throws Exception {
+                    return ProxyOozieClient.super.getJobInfo(jobId, start, len);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public WorkflowAction getWorkflowActionInfo(final String actionId)
+        throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<WorkflowAction>() {
+
+                public WorkflowAction call() throws Exception {
+                    return ProxyOozieClient.super.getWorkflowActionInfo(actionId);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public String getJobLog(final String jobId) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<String>() {
+
+                public String call() throws Exception {
+                    return ProxyOozieClient.super.getJobLog(jobId);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public void getJobLog(final String jobId, final String logRetrievalType,
+                          final String logRetrievalScope, final PrintStream ps)
+        throws OozieClientException {
+        try {
+            doAs(CurrentUser.getUser(), new Callable<Object>() {
+
+                public Object call() throws Exception {
+                    ProxyOozieClient.super.getJobLog(jobId, logRetrievalType, logRetrievalScope, ps);
+                    return null;
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public String getJobDefinition(final String jobId) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<String>() {
+
+                public String call() throws Exception {
+                    return ProxyOozieClient.super.getJobDefinition(jobId);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public BundleJob getBundleJobInfo(final String jobId) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<BundleJob>() {
+
+                public BundleJob call() throws Exception {
+                    return ProxyOozieClient.super.getBundleJobInfo(jobId);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public CoordinatorJob getCoordJobInfo(final String jobId) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<CoordinatorJob>() {
+
+                public CoordinatorJob call() throws Exception {
+                    return ProxyOozieClient.super.getCoordJobInfo(jobId);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public CoordinatorJob getCoordJobInfo(final String jobId, final String filter,
+                                          final int start, final int len)
+        throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<CoordinatorJob>() {
+
+                public CoordinatorJob call() throws Exception {
+                    return ProxyOozieClient.super.getCoordJobInfo(jobId, filter, start, len);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public CoordinatorAction getCoordActionInfo(final String actionId) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<CoordinatorAction>() {
+
+                public CoordinatorAction call() throws Exception {
+                    return ProxyOozieClient.super.getCoordActionInfo(actionId);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public List<CoordinatorAction> reRunCoord(final String jobId, final String rerunType,
+                                              final String scope, final boolean refresh,
+                                              final boolean noCleanup)
+        throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<List<CoordinatorAction>>() {
+
+                public List<CoordinatorAction> call() throws Exception {
+                    return ProxyOozieClient.super.reRunCoord(jobId, rerunType, scope, refresh, noCleanup);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public Void reRunBundle(final String jobId, final String coordScope, final String dateScope,
+                            final boolean refresh, final boolean noCleanup)
+        throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<Void>() {
+
+                public Void call() throws Exception {
+                    return ProxyOozieClient.super.reRunBundle(jobId, coordScope, dateScope, refresh, noCleanup);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public List<WorkflowJob> getJobsInfo(final String filter, final int start, final int len)
+        throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<List<WorkflowJob>>() {
+
+                public List<WorkflowJob> call() throws Exception {
+                    return ProxyOozieClient.super.getJobsInfo(filter, start, len);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public List<WorkflowJob> getJobsInfo(final String filter) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<List<WorkflowJob>>() {
+
+                public List<WorkflowJob> call() throws Exception {
+                    return ProxyOozieClient.super.getJobsInfo(filter);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public void getSlaInfo(final int start, final int len, final String filter) throws OozieClientException {
+        try {
+            doAs(CurrentUser.getUser(), new Callable<Object>() {
+
+                public Object call() throws Exception {
+                    ProxyOozieClient.super.getSlaInfo(start, len, filter);
+                    return null;
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public String getJobId(final String externalId) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<String>() {
+
+                public String call() throws Exception {
+                    return ProxyOozieClient.super.getJobId(externalId);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public List<CoordinatorJob> getCoordJobsInfo(final String filter, final int start,
+                                                 final int len) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<List<CoordinatorJob>>() {
+
+                public List<CoordinatorJob> call() throws Exception {
+                    return ProxyOozieClient.super.getCoordJobsInfo(filter, start, len);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+
+    @Override
+    public List<BundleJob> getBundleJobsInfo(final String filter, final int start,
+                                             final int len) throws OozieClientException {
+        try {
+            return doAs(CurrentUser.getUser(), new Callable<List<BundleJob>>() {
+                public List<BundleJob> call() throws Exception {
+                    return ProxyOozieClient.super.getBundleJobsInfo(filter, start, len);
+                }
+            });
+        } catch (Exception e) {
+            throw new OozieClientException(OozieClientException.AUTHENTICATION, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index c6485cd..871c63f 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -52,6 +52,7 @@ public class FalconPostProcessingTest {
                             "-" + Arg.FEED_INSTANCE_PATHS.getOptionName(),
                             "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
                             "-" + Arg.WORKFLOW_ID.getOptionName(), "workflow-01-00",
+                            "-" + Arg.WORKFLOW_USER.getOptionName(), "falcon",
                             "-" + Arg.RUN_ID.getOptionName(), "1",
                             "-" + Arg.NOMINAL_TIME.getOptionName(), "2011-01-01-01-00",
                             "-" + Arg.TIMESTAMP.getOptionName(), "2012-01-01-01-00",
@@ -150,6 +151,10 @@ public class FalconPostProcessingTest {
                 "agg-coord");
         Assert.assertEquals(m.getString(Arg.WORKFLOW_ID.getOptionName()),
                 "workflow-01-00");
+        String workflowUser = m.getString(Arg.WORKFLOW_USER.getOptionName());
+        if (workflowUser != null) { // in case of user message, its NULL
+            Assert.assertEquals(workflowUser, "falcon");
+        }
         Assert.assertEquals(m.getString(Arg.RUN_ID.getOptionName()), "1");
         Assert.assertEquals(m.getString(Arg.NOMINAL_TIME.getOptionName()),
                 "2011-01-01T01:00Z");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fca01b5..d647a62 100644
--- a/pom.xml
+++ b/pom.xml
@@ -941,11 +941,16 @@
 
             <!--  this is needed for embedded oozie -->
             <dependency>
-                <groupId>org.apache.hive</groupId>
-                <artifactId>hive-exec</artifactId>
-                <version>${hive.version}</version>
+                <groupId>org.apache.hcatalog</groupId>
+                <artifactId>webhcat-java-client</artifactId>
+                <version>${hcatalog.version}</version>
                 <exclusions>
                     <exclusion>
+                        <!-- This implies you cannot use orc files -->
+                        <groupId>com.google.protobuf</groupId>
+                        <artifactId>protobuf-java</artifactId>
+                    </exclusion>
+                    <exclusion>
                         <groupId>org.apache.hbase</groupId>
                         <artifactId>hbase-server</artifactId>
                     </exclusion>
@@ -961,12 +966,6 @@
             </dependency>
 
             <dependency>
-                <groupId>org.apache.hcatalog</groupId>
-                <artifactId>webhcat-java-client</artifactId>
-                <version>${hcatalog.version}</version>
-            </dependency>
-
-            <dependency>
                 <groupId>net.sourceforge.findbugs</groupId>
                 <artifactId>annotations</artifactId>
                 <version>1.3.2</version>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
index 2ba76a7..cf556e5 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
@@ -29,7 +29,14 @@ import org.apache.falcon.util.RuntimeProperties;
 import org.apache.log4j.Logger;
 
 import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status.Family;
@@ -44,8 +51,6 @@ import java.util.Properties;
 public class HTTPChannel extends AbstractChannel {
     private static final Logger LOG = Logger.getLogger(HTTPChannel.class);
 
-    private static final String REMOTE_USER = "Remote-User";
-
     private static final HttpServletRequest DEFAULT_NULL_REQUEST = new NullServletRequest();
 
     private static final Properties DEPLOYMENT_PROPERTIES = DeploymentProperties.get();
@@ -84,8 +89,9 @@ public class HTTPChannel extends AbstractChannel {
 
             ClientResponse response = Client.create(new DefaultClientConfig())
                     .resource(UriBuilder.fromUri(url).build())
-                    .header(REMOTE_USER, user).accept(accept)
-                    .type(mimeType).method(httpMethod, ClientResponse.class,
+                    .queryParam("user.name", user)
+                    .accept(accept).type(mimeType)
+                    .method(httpMethod, ClientResponse.class,
                             (isPost(httpMethod) ? incomingRequest.getInputStream() : null));
             incomingRequest.getInputStream().reset();
 
@@ -186,12 +192,4 @@ public class HTTPChannel extends AbstractChannel {
         }
         return consumes.value()[0];
     }
-
-    private String getProduces(Method method) {
-        Produces produces = method.getAnnotation(Produces.class);
-        if (produces.value() == null) {
-            return MediaType.WILDCARD;
-        }
-        return produces.value()[0];
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java b/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
index f172e82..b4b544c 100644
--- a/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
+++ b/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
@@ -18,82 +18,188 @@
 
 package org.apache.falcon.security;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
 import org.apache.log4j.Logger;
 import org.apache.log4j.NDC;
 
-import javax.servlet.*;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
 /**
  * This enforces authentication as part of the filter before processing the request.
+ * Subclass of {@link AuthenticationFilter}.
  */
-public class BasicAuthFilter implements Filter {
+public class BasicAuthFilter extends AuthenticationFilter {
 
     private static final Logger LOG = Logger.getLogger(BasicAuthFilter.class);
 
-    private static final String GUEST = "guest";
+    /**
+     * Constant for the configuration property that indicates the prefix.
+     */
+    private static final String FALCON_PREFIX = "falcon.http.authentication.";
 
-    private static final Set<String> BLACK_LISTED_USER = new HashSet<String>(
-            Arrays.asList(new String[]{"hdfs", "mapred", "oozie", "falcon"}));
+    /**
+     * Constant for the configuration property that indicates the blacklisted super users for falcon.
+     */
+    private static final String BLACK_LISTED_USERS_KEY = FALCON_PREFIX + "blacklisted.users";
 
-    private boolean isSecure;
+    /**
+     * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
+     * before invoking the actual resource.
+     */
+    private HttpServlet optionsServlet;
+    private Set<String> blackListedUsers;
 
+    /**
+     * Initialize the filter.
+     *
+     * @param filterConfig filter configuration.
+     * @throws ServletException thrown if the filter could not be initialized.
+     */
     @Override
     public void init(FilterConfig filterConfig) throws ServletException {
-        String secure = StartupProperties.get().getProperty("security.enabled", "true");
-        this.isSecure = Boolean.parseBoolean(secure);
+        LOG.info("BasicAuthFilter initialization started");
+        super.init(filterConfig);
+
+        optionsServlet = new HttpServlet() {};
+        optionsServlet.init();
+
+        initializeBlackListedUsers();
     }
 
+    private void initializeBlackListedUsers() {
+        blackListedUsers = new HashSet<String>();
+        String blackListedUserConfig = StartupProperties.get().getProperty(BLACK_LISTED_USERS_KEY);
+        if (!StringUtils.isEmpty(blackListedUserConfig)) {
+            blackListedUsers.addAll(Arrays.asList(blackListedUserConfig.split(",")));
+        }
+    }
+
+    /**
+     * Returns the configuration from Oozie configuration to be used by the authentication filter.
+     * <p/>
+     * All properties from Oozie configuration which name starts with {@link #FALCON_PREFIX} will
+     * be returned. The keys of the returned properties are trimmed from the {@link #FALCON_PREFIX}
+     * prefix, for example the Oozie configuration property name 'oozie.authentication.type' will
+     * be just 'type'.
+     *
+     * @param configPrefix configuration prefix, this parameter is ignored by this implementation.
+     * @param filterConfig filter configuration, this parameter is ignored by this implementation.
+     * @return all Oozie configuration properties prefixed with {@link #FALCON_PREFIX}, without the
+     * prefix.
+     */
     @Override
-    public void doFilter(ServletRequest request,
-                         ServletResponse response,
-                         FilterChain chain) throws IOException, ServletException {
+    protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) {
+        Properties authProperties = new Properties();
+        Properties configProperties = StartupProperties.get();
+
+        // setting the cookie path to root '/' so it is used for all resources.
+        authProperties.setProperty(AuthenticationFilter.COOKIE_PATH, "/");
 
-        if (!(request instanceof HttpServletRequest) || !(response instanceof HttpServletResponse)) {
-            throw new IllegalStateException("Invalid request/response object");
+        for (Map.Entry entry : configProperties.entrySet()) {
+            String name = (String) entry.getKey();
+            if (name.startsWith(FALCON_PREFIX)) {
+                String value = (String) entry.getValue();
+                name = name.substring(FALCON_PREFIX.length());
+                authProperties.setProperty(name, value);
+            }
         }
-        HttpServletRequest httpRequest = (HttpServletRequest) request;
-        HttpServletResponse httpResponse = (HttpServletResponse) response;
 
-        String user;
-        String requestId = UUID.randomUUID().toString();
+        return authProperties;
+    }
 
-        if (!isSecure) {
-            user = GUEST;
-        } else {
-            user = httpRequest.getHeader("Remote-User");
-        }
+    @Override
+    public void doFilter(final ServletRequest request, final ServletResponse response,
+                         final FilterChain filterChain) throws IOException, ServletException {
+
+        FilterChain filterChainWrapper = new FilterChain() {
+
+            @Override
+            public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
+                throws IOException, ServletException {
+                HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
+
+                if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
+                    optionsServlet.service(request, response);
+                } else {
+                    final String user = getUserFromRequest(httpRequest);
+                    if (StringUtils.isEmpty(user)) {
+                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+                                "User can't be empty");
+                    } else if (blackListedUsers.contains(user)) {
+                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+                                "User can't be a superuser:" + BLACK_LISTED_USERS_KEY);
+                    } else {
+                        try {
+                            String requestId = UUID.randomUUID().toString();
+                            NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo());
+                            NDC.push(requestId);
+                            CurrentUser.authenticate(user);
+                            LOG.info("Request from user: " + user + ", URL=" + getRequestUrl(httpRequest));
 
-        if (user == null || user.isEmpty()) {
-            httpResponse.sendError(Response.Status.BAD_REQUEST.getStatusCode(),
-                    "Remote user header can't be empty");
-        } else if (BLACK_LISTED_USER.contains(user)) {
-            httpResponse.sendError(Response.Status.BAD_REQUEST.getStatusCode(),
-                    "Remote user header can't be superusers:" + BLACK_LISTED_USER);
-        } else {
-            CurrentUser.authenticate(user);
-            try {
-                NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo());
-                NDC.push(requestId);
-                LOG.info("Request from user: " + user + ", path=" + httpRequest.getPathInfo()
-                        + ", query=" + httpRequest.getQueryString());
-                chain.doFilter(request, response);
-            } finally {
-                NDC.pop();
-                NDC.pop();
+                            filterChain.doFilter(servletRequest, servletResponse);
+                        } finally {
+                            NDC.pop();
+                            NDC.pop();
+                        }
+                    }
+                }
             }
-        }
+
+            private String getUserFromRequest(HttpServletRequest httpRequest) {
+                String user = httpRequest.getRemoteUser(); // this is available from wrapper in super class
+                if (!StringUtils.isEmpty(user)) {
+                    return user;
+                }
+
+                user = httpRequest.getParameter("user.name"); // available in query-param
+                if (!StringUtils.isEmpty(user)) {
+                    return user;
+                }
+
+                user = httpRequest.getHeader("Remote-User"); // backwards-compatibility
+                if (!StringUtils.isEmpty(user)) {
+                    return user;
+                }
+
+                return null;
+            }
+
+            private String getRequestUrl(HttpServletRequest request) {
+                StringBuffer url = request.getRequestURL();
+                if (request.getQueryString() != null) {
+                    url.append("?").append(request.getQueryString());
+                }
+
+                return url.toString();
+            }
+        };
+
+        super.doFilter(request, response, filterChainWrapper);
     }
 
     @Override
     public void destroy() {
+        if (optionsServlet != null) {
+            optionsServlet.destroy();
+        }
+
+        super.destroy();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/main/java/org/apache/falcon/security/RemoteUserInHeaderBasedAuthenticationHandler.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/RemoteUserInHeaderBasedAuthenticationHandler.java b/prism/src/main/java/org/apache/falcon/security/RemoteUserInHeaderBasedAuthenticationHandler.java
new file mode 100644
index 0000000..1d32e86
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/security/RemoteUserInHeaderBasedAuthenticationHandler.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.server.AuthenticationToken;
+import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+/**
+ * This class is for backwards compatibility with clients who send Remote-User in the request
+ * header else delegates to PseudoAuthenticationHandler.
+ *
+ * This is a temporary solution until Falcon clients (0.3) are deprecated.
+ */
+public class RemoteUserInHeaderBasedAuthenticationHandler extends PseudoAuthenticationHandler {
+
+    @Override
+    public AuthenticationToken authenticate(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, AuthenticationException {
+
+        String userName = request.getHeader("Remote-User");
+        if (StringUtils.isEmpty(userName)) {
+            return super.authenticate(request, response);
+        } else {
+            return new AuthenticationToken(userName, userName, getType());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index 6ac926d..7e2a6c1 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -26,6 +26,7 @@ import org.apache.falcon.rerun.event.RerunEvent.RerunType;
 import org.apache.falcon.rerun.handler.AbstractRerunHandler;
 import org.apache.falcon.rerun.handler.RerunHandlerFactory;
 import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.log4j.Logger;
@@ -40,13 +41,14 @@ import java.util.Date;
 public class FalconTopicSubscriber implements MessageListener, ExceptionListener {
     private static final Logger LOG = Logger.getLogger(FalconTopicSubscriber.class);
 
-    private TopicSubscriber subscriber;
-    private String implementation;
-    private String userName;
-    private String password;
-    private String url;
-    private String topicName;
+    private final String implementation;
+    private final String userName;
+    private final String password;
+    private final String url;
+    private final String topicName;
+
     private Connection connection;
+    private TopicSubscriber subscriber;
 
     private AbstractRerunHandler retryHandler = RerunHandlerFactory.getRerunHandler(RerunType.RETRY);
     private AbstractRerunHandler latedataHandler = RerunHandlerFactory.getRerunHandler(RerunType.LATE);
@@ -62,8 +64,7 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
 
     public void startSubscriber() throws FalconException {
         try {
-            connection = createAndGetConnection(implementation, userName,
-                    password, url);
+            connection = createAndGetConnection(implementation, userName, password, url);
             TopicSession session = (TopicSession) connection.createSession(
                     false, Session.AUTO_ACKNOWLEDGE);
             Topic destination = session.createTopic(topicName);
@@ -72,8 +73,7 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
             connection.setExceptionListener(this);
             connection.start();
         } catch (Exception e) {
-            LOG.error("Error starting subscriber of topic: " + this.toString(),
-                    e);
+            LOG.error("Error starting subscriber of topic: " + this.toString(), e);
             throw new FalconException(e);
         }
     }
@@ -82,40 +82,41 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
     public void onMessage(Message message) {
         MapMessage mapMessage = (MapMessage) message;
         try {
-            debug(mapMessage);
+            if (LOG.isDebugEnabled()) {debug(mapMessage); }
             String cluster = mapMessage.getString(ARG.cluster.getArgName());
             String entityName = mapMessage.getString(ARG.entityName.getArgName());
             String entityType = mapMessage.getString(ARG.entityType.getArgName());
             String workflowId = mapMessage.getString(ARG.workflowId.getArgName());
+            String workflowUser = mapMessage.getString(ARG.workflowUser.getArgName());
             String runId = mapMessage.getString(ARG.runId.getArgName());
             String nominalTime = mapMessage.getString(ARG.nominalTime.getArgName());
             String status = mapMessage.getString(ARG.status.getArgName());
             String operation = mapMessage.getString(ARG.operation.getArgName());
 
+            CurrentUser.authenticate(workflowUser);
             AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
             InstancesResult result = wfEngine.getJobDetails(cluster, workflowId);
             Date startTime = result.getInstances()[0].startTime;
             Date endTime = result.getInstances()[0].endTime;
             Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
+
             if (status.equalsIgnoreCase("FAILED")) {
                 retryHandler.handleRerun(cluster, entityType, entityName,
-                        nominalTime, runId, workflowId,
+                        nominalTime, runId, workflowId, workflowUser,
                         System.currentTimeMillis());
 
                 GenericAlert.instrumentFailedInstance(cluster, entityType,
-                        entityName, nominalTime, workflowId, runId, operation,
-                        SchemaHelper.formatDateUTC(startTime),
-                        "", "", duration);
+                        entityName, nominalTime, workflowId, workflowUser, runId, operation,
+                        SchemaHelper.formatDateUTC(startTime), "", "", duration);
 
             } else if (status.equalsIgnoreCase("SUCCEEDED")) {
                 latedataHandler.handleRerun(cluster, entityType, entityName,
-                        nominalTime, runId, workflowId,
+                        nominalTime, runId, workflowId, workflowUser,
                         System.currentTimeMillis());
 
                 GenericAlert.instrumentSucceededInstance(cluster, entityType,
-                        entityName, nominalTime, workflowId, runId, operation,
-                        SchemaHelper.formatDateUTC(startTime),
-                        duration);
+                        entityName, nominalTime, workflowId, workflowUser, runId, operation,
+                        SchemaHelper.formatDateUTC(startTime), duration);
 
                 notifySLAService(cluster, entityName, entityType, nominalTime, duration);
             }
@@ -143,17 +144,14 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
     }
 
     private void debug(MapMessage mapMessage) throws JMSException {
-        if (LOG.isDebugEnabled()) {
-            StringBuffer buff = new StringBuffer();
-            buff.append("Received:{");
-            for (ARG arg : ARG.values()) {
-                buff.append(arg.getArgName()).append('=').
-                        append(mapMessage.getString(arg.getArgName())).
-                        append(", ");
-            }
-            buff.append("}");
-            LOG.debug(buff);
+        StringBuilder buff = new StringBuilder();
+        buff.append("Received:{");
+        for (ARG arg : ARG.values()) {
+            buff.append(arg.getArgName()).append('=')
+                .append(mapMessage.getString(arg.getArgName())).append(", ");
         }
+        buff.append("}");
+        LOG.debug(buff);
     }
 
     @Override
@@ -164,11 +162,14 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
     public void closeSubscriber() throws FalconException {
         try {
             LOG.info("Closing subscriber on topic : " + this.topicName);
-            subscriber.close();
-            connection.close();
+            if (subscriber != null) {
+                subscriber.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
         } catch (JMSException e) {
-            LOG.error("Error closing subscriber of topic: " + this.toString(),
-                    e);
+            LOG.error("Error closing subscriber of topic: " + this.toString(), e);
             throw new FalconException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java b/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
index 1cd7776..9ae1ad1 100644
--- a/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
+++ b/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
@@ -62,6 +62,8 @@ public class ProcessSubscriberService implements FalconService {
 
     @Override
     public void destroy() throws FalconException {
-        subscriber.closeSubscriber();
+        if (subscriber != null) { // in case there was an exception while starting subscriber
+            subscriber.closeSubscriber();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java b/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java
index 1db71fd..919f821 100644
--- a/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java
+++ b/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java
@@ -28,8 +28,8 @@ public class GenericAlertTest {
 
     @Test
     public void testWfInstanceFailedAlert() throws Exception {
-        GenericAlert.instrumentFailedInstance("cluster", "process", "agg-coord", "120:df", "ef-id", "1",
-                "DELETE", "now", "error", "none", 1242);
+        GenericAlert.instrumentFailedInstance("cluster", "process", "agg-coord", "120:df",
+                "ef-id", "wf-user", "1", "DELETE", "now", "error", "none", 1242);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java b/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
index f1536f4..9b1d42a 100644
--- a/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java
@@ -34,9 +34,7 @@ import javax.jms.*;
  */
 public class FalconTopicSubscriberTest {
 
-    private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
-    // private static final String BROKER_URL =
-    // "tcp://localhost:61616?daemon=true";
+    private static final String BROKER_URL = "vm://localhost";
     private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
     private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC";
     private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC";
@@ -78,8 +76,7 @@ public class FalconTopicSubscriberTest {
         MapMessage mapMessage = session.createMapMessage();
         message.getKeyValueMap().put(ARG.status, "FAILED");
         for (ARG arg : ARG.values()) {
-            mapMessage.setString(arg.getPropName(), message
-                    .getKeyValueMap().get(arg));
+            mapMessage.setString(arg.getPropName(), message.getKeyValueMap().get(arg));
         }
         producer.send(mapMessage);
     }
@@ -101,6 +98,7 @@ public class FalconTopicSubscriberTest {
         message.getKeyValueMap().put(ARG.workflowId, "workflow-" + i);
         message.getKeyValueMap().put(ARG.topicName, TOPIC_NAME);
         message.getKeyValueMap().put(ARG.status, "SUCCEEDED");
+        message.getKeyValueMap().put(ARG.workflowUser, "falcon");
         return message;
     }
 


Mime
View raw message