Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D54E7108BF for ; Sun, 16 Feb 2014 03:32:12 +0000 (UTC) Received: (qmail 98542 invoked by uid 500); 16 Feb 2014 03:32:12 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 98463 invoked by uid 500); 16 Feb 2014 03:32:11 -0000 Mailing-List: contact commits-help@falcon.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.incubator.apache.org Delivered-To: mailing list commits@falcon.incubator.apache.org Received: (qmail 98387 invoked by uid 99); 16 Feb 2014 03:32:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 16 Feb 2014 03:32:05 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 16 Feb 2014 03:31:46 +0000 Received: (qmail 98314 invoked by uid 99); 16 Feb 2014 03:31:22 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 16 Feb 2014 03:31:22 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 585918272C1; Sun, 16 Feb 2014 03:31:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venkatesh@apache.org To: commits@falcon.incubator.apache.org Date: Sun, 16 Feb 2014 03:31:24 -0000 Message-Id: In-Reply-To: <3e45366aeae04434a4695dfd37a2ce59@git.apache.org> References: <3e45366aeae04434a4695dfd37a2ce59@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] FALCON-11 Add support for security in Falcon. Contributed by Venkatesh Seetharam X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java index 48d4589..0c338da 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java +++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java @@ -25,6 +25,7 @@ import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.resource.InstancesResult.Instance; import org.apache.falcon.resource.InstancesResult.InstanceAction; import org.apache.hadoop.conf.Configuration; @@ -48,15 +49,12 @@ public final class LogProvider { Cluster clusterObj = ConfigurationStore.get().get( EntityType.CLUSTER, instance.cluster); - String resolvedRunId = "-"; try { - FileSystem fs = FileSystem.get( - new Path(ClusterHelper.getStorageUrl(clusterObj)).toUri(), - new Configuration()); - resolvedRunId = getResolvedRunId(fs, clusterObj, entity, instance, - runId); - // if runId param is not resolved, i.e job is killed or not started - // or running + Configuration conf = ClusterHelper.getConfiguration(clusterObj); + // fs on behalf of the end user. + FileSystem fs = HadoopClientFactory.get().createFileSystem(conf); + String resolvedRunId = getResolvedRunId(fs, clusterObj, entity, instance, runId); + // if runId param is not resolved, i.e job is killed or not started or running if (resolvedRunId.equals("-") && StringUtils.isEmpty(instance.logFile)) { instance.logFile = "-"; @@ -107,13 +105,13 @@ public final class LogProvider { } private Instance populateActionLogUrls(FileSystem fs, Cluster cluster, - Entity entity, Instance instance, String formatedRunId) + Entity entity, Instance instance, String formattedRunId) throws FalconException, OozieClientException, IOException { Path actionPaths = new Path(ClusterHelper.getStorageUrl(cluster), EntityUtil.getLogPath(cluster, entity) + "/job-" + EntityUtil.fromUTCtoURIDate(instance.instance) + "/" - + formatedRunId + "/*"); + + formattedRunId + "/*"); FileStatus[] actions = fs.globStatus(actionPaths); InstanceAction[] instanceActions = new InstanceAction[actions.length - 1]; instance.actions = instanceActions; @@ -124,7 +122,7 @@ public final class LogProvider { ClusterHelper.getStorageUrl(cluster), EntityUtil.getLogPath(cluster, entity) + "/job-" + EntityUtil.fromUTCtoURIDate(instance.instance) + "/" - + formatedRunId, file.getPath().getName()); + + formattedRunId, file.getPath().getName()); if (filePath.getName().equals("oozie.log")) { instance.logFile = dfsBrowserUrl; continue; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java index 37f8cfa..63c16ad 100644 --- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java +++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java @@ -26,6 +26,7 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.Interfacetype; +import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -94,14 +95,13 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener } LOG.debug("Copying libs from " + src); - Configuration conf = ClusterHelper.getConfiguration(cluster); - conf.setInt("ipc.client.connect.max.retries", 10); - FileSystem fs = null; + FileSystem fs; try { - fs = FileSystem.get(conf); + fs = getFileSystem(cluster); + fs.getConf().set("dfs.umaskmode", "022"); // drwxr-xr-x } catch (Exception e) { throw new FalconException("Unable to connect to HDFS: " - + ClusterHelper.getStorageUrl(cluster)); + + ClusterHelper.getStorageUrl(cluster), e); } if (!fs.exists(target)) { fs.mkdirs(target); @@ -137,6 +137,15 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener } } + // the dir is owned by Falcon but world-readable + private static FileSystem getFileSystem(Cluster cluster) + throws FalconException, IOException { + Configuration conf = ClusterHelper.getConfiguration(cluster); + conf.setInt("ipc.client.connect.max.retries", 10); + + return HadoopClientFactory.get().createFileSystem(conf); + } + @Override public void onAdd(Entity entity, boolean ignoreFailure) throws FalconException { if (entity.getEntityType() != EntityType.CLUSTER) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java index 3f9256c..c19c89c 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java @@ -23,6 +23,7 @@ import org.apache.falcon.logging.LogMover; import org.apache.falcon.messaging.MessageProducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; @@ -61,7 +62,8 @@ public class FalconPostProcessing extends Configured implements Tool { WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie"), USER_SUBFLOW_ID("subflowId", "external id of user workflow"), USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"), - LOG_DIR("logDir", "log dir where job logs are copied"); + LOG_DIR("logDir", "log dir where job logs are copied"), + WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"); private String name; private String description; @@ -96,7 +98,7 @@ public class FalconPostProcessing extends Configured implements Tool { LOG.info("Sending user message " + cmd); invokeUserMessageProducer(cmd); - //LogMover doesnt throw exception, a failed logmover will not fail the user workflow + //LogMover doesn't throw exception, a failed log mover will not fail the user workflow LOG.info("Moving logs " + cmd); invokeLogProducer(cmd); @@ -155,11 +157,18 @@ public class FalconPostProcessing extends Configured implements Tool { addArg(args, cmd, Arg.FEED_NAMES); addArg(args, cmd, Arg.FEED_INSTANCE_PATHS); addArg(args, cmd, Arg.LOG_FILE); + addArg(args, cmd, Arg.WORKFLOW_USER); MessageProducer.main(args.toArray(new String[0])); } private void invokeLogProducer(CommandLine cmd) throws Exception { + // todo: need to move this out to Falcon in-process + if (UserGroupInformation.isSecurityEnabled()) { + LOG.info("Unable to move logs as security is enabled."); + return; + } + List args = new ArrayList(); addArg(args, cmd, Arg.WF_ENGINE_URL); addArg(args, cmd, Arg.ENTITY_TYPE); @@ -204,6 +213,8 @@ public class FalconPostProcessing extends Configured implements Tool { addOption(options, Arg.USER_SUBFLOW_ID); addOption(options, Arg.USER_WORKFLOW_ENGINE, false); addOption(options, Arg.LOG_DIR); + addOption(options, Arg.WORKFLOW_USER); + return new GnuParser().parse(options, arguments); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java index b757531..d6dd2af 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java @@ -24,8 +24,7 @@ import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.log4j.Logger; -import org.apache.oozie.client.CustomOozieClient; -import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.ProxyOozieClient; import java.util.concurrent.ConcurrentHashMap; @@ -35,42 +34,43 @@ import java.util.concurrent.ConcurrentHashMap; public final class OozieClientFactory { private static final Logger LOG = Logger.getLogger(OozieClientFactory.class); - - private static final ConcurrentHashMap CACHE = - new ConcurrentHashMap(); private static final String LOCAL_OOZIE = "local"; + + private static final ConcurrentHashMap CACHE = + new ConcurrentHashMap(); private static volatile boolean localInitialized = false; private OozieClientFactory() {} - public static synchronized OozieClient get(Cluster cluster) + public static synchronized ProxyOozieClient get(Cluster cluster) throws FalconException { assert cluster != null : "Cluster cant be null"; String oozieUrl = ClusterHelper.getOozieUrl(cluster); if (!CACHE.containsKey(oozieUrl)) { - OozieClient ref = getClientRef(oozieUrl); + ProxyOozieClient ref = getClientRef(oozieUrl); LOG.info("Caching Oozie client object for " + oozieUrl); CACHE.putIfAbsent(oozieUrl, ref); } + return CACHE.get(oozieUrl); } - public static OozieClient get(String cluster) throws FalconException { - return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, cluster)); + public static ProxyOozieClient get(String clusterName) throws FalconException { + return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, clusterName)); } - private static OozieClient getClientRef(String oozieUrl) + private static ProxyOozieClient getClientRef(String oozieUrl) throws FalconException { if (LOCAL_OOZIE.equals(oozieUrl)) { return getLocalOozieClient(); } else { - return new CustomOozieClient(oozieUrl); + return new ProxyOozieClient(oozieUrl); } } - private static OozieClient getLocalOozieClient() throws FalconException { + private static ProxyOozieClient getLocalOozieClient() throws FalconException { try { if (!localInitialized) { //LocalOozie.start(); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java index 068e980..a930bb7 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java @@ -24,6 +24,7 @@ import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -56,7 +57,7 @@ public class OozieHouseKeepingService implements WorkflowEngineActionListener { LOG.info("Deleting entity path " + entityPath + " on cluster " + clusterName); Configuration conf = ClusterHelper.getConfiguration(cluster); - FileSystem fs = FileSystem.get(conf); + FileSystem fs = HadoopClientFactory.get().createFileSystem(conf); if (fs.exists(entityPath) && !fs.delete(entityPath, true)) { throw new FalconException("Unable to cleanup entity path: " + entityPath); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 71ff430..cea73bd 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -27,6 +27,7 @@ import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.*; import org.apache.falcon.entity.v0.Frequency.TimeUnit; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesResult.Instance; @@ -54,8 +55,7 @@ import java.util.Map.Entry; */ public class OozieWorkflowEngine extends AbstractWorkflowEngine { - private static final Logger LOG = Logger - .getLogger(OozieWorkflowEngine.class); + private static final Logger LOG = Logger.getLogger(OozieWorkflowEngine.class); public static final String ENGINE = "oozie"; private static final BundleJob MISSING = new NullBundleJob(); @@ -130,8 +130,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { private void commitStagingPath(String cluster, String path) throws FalconException { path = StringUtils.removeStart(path, "${nameNode}"); - FileSystem fs = - ClusterHelper.getFileSystem((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, cluster)); + Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster); + FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(clusterEntity)); try { fs.create(new Path(path, EntityUtil.SUCCEEDED_FILE_NAME)).close(); } catch (IOException e) { @@ -187,10 +187,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } //Return all bundles for the entity in the requested cluster - private List findBundles(Entity entity, String cluster) throws FalconException { + private List findBundles(Entity entity, String clusterName) throws FalconException { + try { - OozieClient client = OozieClientFactory.get(cluster); - List jobs = client.getBundleJobsInfo( + List jobs = OozieClientFactory.get(clusterName).getBundleJobsInfo( OozieClient.FILTER_NAME + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256); if (jobs != null) { List filteredJobs = new ArrayList(); @@ -317,22 +317,23 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return "SUCCESS"; } - private void killBundle(String cluster, BundleJob job) throws FalconException { - OozieClient client = OozieClientFactory.get(cluster); + private void killBundle(String clusterName, BundleJob job) throws FalconException { + ProxyOozieClient client = OozieClientFactory.get(clusterName); try { //kill all coords for (CoordinatorJob coord : job.getCoordinators()) { client.kill(coord.getId()); - LOG.debug("Killed coord " + coord.getId() + " on cluster " + cluster); + LOG.debug("Killed coord " + coord.getId() + " on cluster " + clusterName); } //set end time of bundle - client.change(job.getId(), OozieClient.CHANGE_VALUE_ENDTIME + "=" + SchemaHelper.formatDateUTC(new Date())); - LOG.debug("Changed end time of bundle " + job.getId() + " on cluster " + cluster); + client.change(job.getId(), + OozieClient.CHANGE_VALUE_ENDTIME + "=" + SchemaHelper.formatDateUTC(new Date())); + LOG.debug("Changed end time of bundle " + job.getId() + " on cluster " + clusterName); //kill bundle client.kill(job.getId()); - LOG.debug("Killed bundle " + job.getId() + " on cluster " + cluster); + LOG.debug("Killed bundle " + job.getId() + " on cluster " + clusterName); } catch (OozieClientException e) { throw new FalconException(e); } @@ -383,7 +384,6 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public InstancesResult getRunningInstances(Entity entity) throws FalconException { - try { WorkflowBuilder builder = WorkflowBuilder.getBuilder( ENGINE, entity); @@ -400,7 +400,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } for (String cluster : clusters) { - OozieClient client = OozieClientFactory.get(cluster); + ProxyOozieClient client = OozieClientFactory.get(cluster); List wfs = getRunningWorkflows(cluster, coordNames); if (wfs != null) { for (WorkflowJob job : wfs) { @@ -476,10 +476,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { private WorkflowJob getWorkflowInfo(String cluster, String wfId) throws FalconException { - - OozieClient client = OozieClientFactory.get(cluster); try { - return client.getJobInfo(wfId); + return OozieClientFactory.get(cluster).getJobInfo(wfId); } catch (OozieClientException e) { throw new FalconException(e); } @@ -556,7 +554,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } List bundles = entry.getValue(); - OozieClient client = OozieClientFactory.get(cluster); + ProxyOozieClient client = OozieClientFactory.get(cluster); List applicableCoords = getApplicableCoords(entity, client, start, end, bundles); long unscheduledInstances = 0; boolean isLastCoord = false; @@ -725,18 +723,14 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { + Arrays.toString(statuses)); } - private String getSourceCluster(String cluster, - CoordinatorAction coordinatorAction, Entity entity) + private String getSourceCluster(String cluster, CoordinatorAction coordinatorAction, Entity entity) throws FalconException { - - OozieClient client = OozieClientFactory.get(cluster); - CoordinatorJob coordJob; try { - coordJob = client.getCoordJobInfo(coordinatorAction.getJobId()); + CoordinatorJob coordJob = OozieClientFactory.get(cluster).getCoordJobInfo(coordinatorAction.getJobId()); + return EntityUtil.getWorkflowNameSuffix(coordJob.getAppName(), entity); } catch (OozieClientException e) { throw new FalconException("Unable to get oozie job id:" + e); } - return EntityUtil.getWorkflowNameSuffix(coordJob.getAppName(), entity); } private List getIncludedClusters(Properties props, @@ -776,7 +770,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { for (Map.Entry> entry : bundlesMap.entrySet()) { String cluster = entry.getKey(); List bundles = entry.getValue(); - OozieClient client = OozieClientFactory.get(cluster); + ProxyOozieClient client = OozieClientFactory.get(cluster); List applicableCoords = getApplicableCoords(entity, client, start, end, bundles); List actions = new ArrayList(); @@ -836,8 +830,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } - private List getApplicableCoords(Entity entity, - OozieClient client, Date start, Date end, List bundles) + private List getApplicableCoords(Entity entity, ProxyOozieClient client, + Date start, Date end, List bundles) throws FalconException { List applicableCoords = new ArrayList(); @@ -1113,10 +1107,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { private BundleJob getBundleInfo(String cluster, String bundleId) throws FalconException { - - OozieClient client = OozieClientFactory.get(cluster); try { - return client.getBundleJobInfo(bundleId); + return OozieClientFactory.get(cluster).getBundleJobInfo(bundleId); } catch (OozieClientException e) { throw new FalconException(e); } @@ -1132,9 +1124,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { .append(wfName); } - OozieClient client = OozieClientFactory.get(cluster); try { - return client.getJobsInfo(filter.toString(), 1, 1000); + return OozieClientFactory.get(cluster).getJobsInfo(filter.toString(), 1, 1000); } catch (OozieClientException e) { throw new FalconException(e); } @@ -1144,7 +1135,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { public void reRun(String cluster, String jobId, Properties props) throws FalconException { - OozieClient client = OozieClientFactory.get(cluster); + ProxyOozieClient client = OozieClientFactory.get(cluster); try { WorkflowJob jobInfo = client.getJobInfo(jobId); Properties jobprops = OozieUtils.toProperties(jobInfo.getConf()); @@ -1200,7 +1191,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { public String getWorkflowStatus(String cluster, String jobId) throws FalconException { - OozieClient client = OozieClientFactory.get(cluster); + ProxyOozieClient client = OozieClientFactory.get(cluster); try { if (jobId.endsWith("-W")) { WorkflowJob jobInfo = client.getJobInfo(jobId); @@ -1231,9 +1222,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private String run(String cluster, Properties props) throws FalconException { - OozieClient client = OozieClientFactory.get(cluster); try { - String jobId = client.run(props); + String jobId = OozieClientFactory.get(cluster).run(props); LOG.info("Submitted " + jobId + " on cluster " + cluster + " with properties : " + props); return jobId; @@ -1244,9 +1234,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private void suspend(String cluster, String jobId) throws FalconException { - OozieClient client = OozieClientFactory.get(cluster); try { - client.suspend(jobId); + OozieClientFactory.get(cluster).suspend(jobId); assertStatus(cluster, jobId, Status.PREPSUSPENDED, Status.SUSPENDED, Status.SUCCEEDED, Status.FAILED, Status.KILLED); LOG.info("Suspended job " + jobId + " on cluster " + cluster); @@ -1256,9 +1245,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private void resume(String cluster, String jobId) throws FalconException { - OozieClient client = OozieClientFactory.get(cluster); try { - client.resume(jobId); + OozieClientFactory.get(cluster).resume(jobId); assertStatus(cluster, jobId, Status.RUNNING, Status.SUCCEEDED, Status.FAILED, Status.KILLED); LOG.info("Resumed job " + jobId + " on cluster " + cluster); @@ -1268,9 +1256,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private void kill(String cluster, String jobId) throws FalconException { - OozieClient client = OozieClientFactory.get(cluster); try { - client.kill(jobId); + OozieClientFactory.get(cluster).kill(jobId); assertStatus(cluster, jobId, Status.KILLED, Status.SUCCEEDED, Status.FAILED); LOG.info("Killed job " + jobId + " on cluster " + cluster); @@ -1281,10 +1268,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { private void change(String cluster, String jobId, String changeValue) throws FalconException { - try { - OozieClient client = OozieClientFactory.get(cluster); - client.change(jobId, changeValue); + OozieClientFactory.get(cluster).change(jobId, changeValue); LOG.info("Changed bundle/coord " + jobId + ": " + changeValue + " on cluster " + cluster); } catch (OozieClientException e) { @@ -1317,7 +1302,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { // assert that its really changed try { - OozieClient client = OozieClientFactory.get(cluster); + ProxyOozieClient client = OozieClientFactory.get(cluster); CoordinatorJob coord = client.getCoordJobInfo(id); for (int counter = 0; counter < 3; counter++) { Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null @@ -1348,9 +1333,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public Properties getWorkflowProperties(String cluster, String jobId) throws FalconException { - OozieClient client = OozieClientFactory.get(cluster); try { - WorkflowJob jobInfo = client.getJobInfo(jobId); + WorkflowJob jobInfo = OozieClientFactory.get(cluster).getJobInfo(jobId); String conf = jobInfo.getConf(); return OozieUtils.toProperties(conf); } catch (Exception e) { @@ -1361,12 +1345,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public InstancesResult getJobDetails(String cluster, String jobId) throws FalconException { - - OozieClient client = OozieClientFactory.get(cluster); Instance[] instances = new Instance[1]; Instance instance = new Instance(); try { - WorkflowJob jobInfo = client.getJobInfo(jobId); + WorkflowJob jobInfo = OozieClientFactory.get(cluster).getJobInfo(jobId); instance.startTime = jobInfo.getStartTime(); if (jobInfo.getStatus().name().equals(Status.RUNNING.name())) { instance.endTime = new Date(); @@ -1380,6 +1362,5 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } catch (Exception e) { throw new FalconException(e); } - } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java deleted file mode 100644 index c55221e..0000000 --- a/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.client; - -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.util.RuntimeProperties; -import org.apache.log4j.Logger; -import org.apache.oozie.client.rest.RestConstants; -import org.json.simple.JSONObject; -import org.json.simple.JSONValue; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -/** - * Wrapped Oozie Client. - */ -public class CustomOozieClient extends OozieClient { - - private static final Logger LOG = Logger.getLogger(CustomOozieClient.class); - private static final Map NONE = new HashMap(); - - public CustomOozieClient(String oozieUrl) { - super(oozieUrl); - } - - public Properties getConfiguration() throws OozieClientException { - return (new OozieConfiguration(RestConstants.ADMIN_CONFIG_RESOURCE)).call(); - } - - public Properties getProperties() throws OozieClientException { - return (new OozieConfiguration(RestConstants.ADMIN_JAVA_SYS_PROPS_RESOURCE)).call(); - } - - @Override - protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException { - String strUrl = url.toString(); - if (!strUrl.contains(OozieClient.USER_NAME)) { // decorate the url with the user in request - String paramSeparator = (strUrl.contains("?")) ? "&" : "?"; - strUrl += paramSeparator + OozieClient.USER_NAME + "=" + CurrentUser.getUser(); - url = new URL(strUrl); - if (LOG.isDebugEnabled()) { - LOG.debug("Decorated url with user info: " + url); - } - } - - HttpURLConnection conn = super.createConnection(url, method); - - int connectTimeout = Integer.valueOf(RuntimeProperties.get().getProperty("oozie.connect.timeout", "1000")); - conn.setConnectTimeout(connectTimeout); - - int readTimeout = Integer.valueOf(RuntimeProperties.get().getProperty("oozie.read.timeout", "45000")); - conn.setReadTimeout(readTimeout); - - return conn; - } - - private class OozieConfiguration extends ClientCallable { - - public OozieConfiguration(String resource) { - super("GET", RestConstants.ADMIN, resource, NONE); - } - - @Override - protected Properties call(HttpURLConnection conn) throws IOException, OozieClientException { - conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); - if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { - Reader reader = new InputStreamReader(conn.getInputStream(), "UTF_8"); - JSONObject json = (JSONObject) JSONValue.parse(reader); - Properties props = new Properties(); - props.putAll(json); - return props; - } else { - handleError(conn); - return null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java new file mode 100644 index 0000000..c78a83a --- /dev/null +++ b/oozie/src/main/java/org/apache/oozie/client/ProxyOozieClient.java @@ -0,0 +1,562 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.client; + +import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.security.SecurityUtil; +import org.apache.falcon.util.RuntimeProperties; +import org.apache.hadoop.hdfs.web.KerberosUgiAuthenticator; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.Authenticator; +import org.apache.log4j.Logger; +import org.apache.oozie.client.rest.RestConstants; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.io.Reader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; + +/** + * Wrapped Oozie Client that does proxy the requests. + */ +public class ProxyOozieClient extends AuthOozieClient { + + private static final Logger LOG = Logger.getLogger(ProxyOozieClient.class); + private static final Map NONE = new HashMap(); + + private final Authenticator authenticator = new KerberosUgiAuthenticator(); + + public ProxyOozieClient(String oozieUrl) { + super(oozieUrl, SecurityUtil.getAuthenticationType()); + + if (org.apache.log4j.Logger.getLogger(getClass()).isDebugEnabled()) { + setDebugMode(1); + } + } + + public Properties getConfiguration() throws OozieClientException { + return (new OozieConfiguration(RestConstants.ADMIN_CONFIG_RESOURCE)).call(); + } + + public Properties getProperties() throws OozieClientException { + return (new OozieConfiguration(RestConstants.ADMIN_JAVA_SYS_PROPS_RESOURCE)).call(); + } + + @Override + protected Authenticator getAuthenticator() throws OozieClientException { + return authenticator; + } + + @Override + protected HttpURLConnection createConnection(URL url, final String method) + throws IOException, OozieClientException { + + final URL decoratedUrl = decorateUrlWithUser(url); + if (LOG.isDebugEnabled()) { + LOG.debug("ProxyOozieClient.createConnection: u=" + url + ", m=" + method); + } + + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + try { + return currentUser.doAs(new PrivilegedExceptionAction() { + public HttpURLConnection run() throws Exception { + HttpURLConnection conn = ProxyOozieClient.super.createConnection(decoratedUrl, method); + + int connectTimeout = Integer.valueOf( + RuntimeProperties.get().getProperty("oozie.connect.timeout", "1000")); + conn.setConnectTimeout(connectTimeout); + + int readTimeout = Integer.valueOf( + RuntimeProperties.get().getProperty("oozie.read.timeout", "45000")); + conn.setReadTimeout(readTimeout); + + return conn; + } + }); + } catch (InterruptedException e) { + throw new IOException("Could not connect to oozie: " + e.getMessage(), e); + } + } + + protected URL decorateUrlWithUser(URL url) throws IOException { + String strUrl = url.toString(); + + if (!strUrl.contains(OozieClient.USER_NAME)) { + // decorate the url with the proxy user in request + String paramSeparator = (strUrl.contains("?")) ? "&" : "?"; + strUrl += paramSeparator + OozieClient.USER_NAME + "=" + + UserGroupInformation.getLoginUser().getUserName(); + // strUrl += "&" + RestConstants.DO_AS_PARAM + "=" + CurrentUser.getUser(); + + url = new URL(strUrl); + if (LOG.isDebugEnabled()) { + LOG.debug("Decorated url with user info: " + url); + } + } + + return url; + } + + private class OozieConfiguration extends ClientCallable { + + public OozieConfiguration(String resource) { + super("GET", RestConstants.ADMIN, resource, NONE); + } + + @Override + protected Properties call(HttpURLConnection conn) + throws IOException, OozieClientException { + conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); + if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { + Reader reader = new InputStreamReader(conn.getInputStream(), "UTF_8"); + JSONObject json = (JSONObject) JSONValue.parse(reader); + Properties props = new Properties(); + props.putAll(json); + return props; + } else { + handleError(conn); + return null; + } + } + } + + @Override + public SYSTEM_MODE getSystemMode() throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public SYSTEM_MODE call() throws Exception { + return ProxyOozieClient.super.getSystemMode(); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public String submit(final Properties conf) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public String call() throws Exception { + return ProxyOozieClient.super.submit(conf); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public String dryrun(final Properties conf) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public String call() throws Exception { + return ProxyOozieClient.super.dryrun(conf); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + + } + + @Override + public void start(final String jobId) throws OozieClientException { + try { + doAs(CurrentUser.getUser(), new Callable() { + + public String call() throws Exception { + ProxyOozieClient.super.start(jobId); + return null; + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public String run(final Properties conf) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public String call() throws Exception { + return ProxyOozieClient.super.run(conf); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public void reRun(final String jobId, final Properties conf) throws OozieClientException { + try { + doAs(CurrentUser.getUser(), new Callable() { + + public Object call() throws Exception { + ProxyOozieClient.super.reRun(jobId, conf); + return null; + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public void suspend(final String jobId) throws OozieClientException { + try { + doAs(CurrentUser.getUser(), new Callable() { + + public Object call() throws Exception { + ProxyOozieClient.super.suspend(jobId); + return null; + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public void resume(final String jobId) throws OozieClientException { + try { + doAs(CurrentUser.getUser(), new Callable() { + + public Object call() throws Exception { + ProxyOozieClient.super.resume(jobId); + return null; + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public void kill(final String jobId) throws OozieClientException { + try { + doAs(CurrentUser.getUser(), new Callable() { + + public Object call() throws Exception { + ProxyOozieClient.super.kill(jobId); + return null; + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public void change(final String jobId, final String changeValue) throws OozieClientException { + try { + doAs(CurrentUser.getUser(), new Callable() { + + public Object call() throws Exception { + ProxyOozieClient.super.change(jobId, changeValue); + return null; + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public WorkflowJob getJobInfo(final String jobId) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public WorkflowJob call() throws Exception { + return ProxyOozieClient.super.getJobInfo(jobId); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public WorkflowJob getJobInfo(final String jobId, final int start, final int len) + throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public WorkflowJob call() throws Exception { + return ProxyOozieClient.super.getJobInfo(jobId, start, len); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public WorkflowAction getWorkflowActionInfo(final String actionId) + throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public WorkflowAction call() throws Exception { + return ProxyOozieClient.super.getWorkflowActionInfo(actionId); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public String getJobLog(final String jobId) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public String call() throws Exception { + return ProxyOozieClient.super.getJobLog(jobId); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public void getJobLog(final String jobId, final String logRetrievalType, + final String logRetrievalScope, final PrintStream ps) + throws OozieClientException { + try { + doAs(CurrentUser.getUser(), new Callable() { + + public Object call() throws Exception { + ProxyOozieClient.super.getJobLog(jobId, logRetrievalType, logRetrievalScope, ps); + return null; + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public String getJobDefinition(final String jobId) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public String call() throws Exception { + return ProxyOozieClient.super.getJobDefinition(jobId); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public BundleJob getBundleJobInfo(final String jobId) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public BundleJob call() throws Exception { + return ProxyOozieClient.super.getBundleJobInfo(jobId); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public CoordinatorJob getCoordJobInfo(final String jobId) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public CoordinatorJob call() throws Exception { + return ProxyOozieClient.super.getCoordJobInfo(jobId); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public CoordinatorJob getCoordJobInfo(final String jobId, final String filter, + final int start, final int len) + throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public CoordinatorJob call() throws Exception { + return ProxyOozieClient.super.getCoordJobInfo(jobId, filter, start, len); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public CoordinatorAction getCoordActionInfo(final String actionId) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public CoordinatorAction call() throws Exception { + return ProxyOozieClient.super.getCoordActionInfo(actionId); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public List reRunCoord(final String jobId, final String rerunType, + final String scope, final boolean refresh, + final boolean noCleanup) + throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable>() { + + public List call() throws Exception { + return ProxyOozieClient.super.reRunCoord(jobId, rerunType, scope, refresh, noCleanup); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public Void reRunBundle(final String jobId, final String coordScope, final String dateScope, + final boolean refresh, final boolean noCleanup) + throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public Void call() throws Exception { + return ProxyOozieClient.super.reRunBundle(jobId, coordScope, dateScope, refresh, noCleanup); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public List getJobsInfo(final String filter, final int start, final int len) + throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable>() { + + public List call() throws Exception { + return ProxyOozieClient.super.getJobsInfo(filter, start, len); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public List getJobsInfo(final String filter) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable>() { + + public List call() throws Exception { + return ProxyOozieClient.super.getJobsInfo(filter); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public void getSlaInfo(final int start, final int len, final String filter) throws OozieClientException { + try { + doAs(CurrentUser.getUser(), new Callable() { + + public Object call() throws Exception { + ProxyOozieClient.super.getSlaInfo(start, len, filter); + return null; + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public String getJobId(final String externalId) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable() { + + public String call() throws Exception { + return ProxyOozieClient.super.getJobId(externalId); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public List getCoordJobsInfo(final String filter, final int start, + final int len) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable>() { + + public List call() throws Exception { + return ProxyOozieClient.super.getCoordJobsInfo(filter, start, len); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } + + @Override + public List getBundleJobsInfo(final String filter, final int start, + final int len) throws OozieClientException { + try { + return doAs(CurrentUser.getUser(), new Callable>() { + public List call() throws Exception { + return ProxyOozieClient.super.getBundleJobsInfo(filter, start, len); + } + }); + } catch (Exception e) { + throw new OozieClientException(OozieClientException.AUTHENTICATION, e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java index c6485cd..871c63f 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java @@ -52,6 +52,7 @@ public class FalconPostProcessingTest { "-" + Arg.FEED_INSTANCE_PATHS.getOptionName(), "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20", "-" + Arg.WORKFLOW_ID.getOptionName(), "workflow-01-00", + "-" + Arg.WORKFLOW_USER.getOptionName(), "falcon", "-" + Arg.RUN_ID.getOptionName(), "1", "-" + Arg.NOMINAL_TIME.getOptionName(), "2011-01-01-01-00", "-" + Arg.TIMESTAMP.getOptionName(), "2012-01-01-01-00", @@ -150,6 +151,10 @@ public class FalconPostProcessingTest { "agg-coord"); Assert.assertEquals(m.getString(Arg.WORKFLOW_ID.getOptionName()), "workflow-01-00"); + String workflowUser = m.getString(Arg.WORKFLOW_USER.getOptionName()); + if (workflowUser != null) { // in case of user message, its NULL + Assert.assertEquals(workflowUser, "falcon"); + } Assert.assertEquals(m.getString(Arg.RUN_ID.getOptionName()), "1"); Assert.assertEquals(m.getString(Arg.NOMINAL_TIME.getOptionName()), "2011-01-01T01:00Z"); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index fca01b5..d647a62 100644 --- a/pom.xml +++ b/pom.xml @@ -941,11 +941,16 @@ - org.apache.hive - hive-exec - ${hive.version} + org.apache.hcatalog + webhcat-java-client + ${hcatalog.version} + + com.google.protobuf + protobuf-java + + org.apache.hbase hbase-server @@ -961,12 +966,6 @@ - org.apache.hcatalog - webhcat-java-client - ${hcatalog.version} - - - net.sourceforge.findbugs annotations 1.3.2 http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java index 2ba76a7..cf556e5 100644 --- a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java +++ b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java @@ -29,7 +29,14 @@ import org.apache.falcon.util.RuntimeProperties; import org.apache.log4j.Logger; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status.Family; @@ -44,8 +51,6 @@ import java.util.Properties; public class HTTPChannel extends AbstractChannel { private static final Logger LOG = Logger.getLogger(HTTPChannel.class); - private static final String REMOTE_USER = "Remote-User"; - private static final HttpServletRequest DEFAULT_NULL_REQUEST = new NullServletRequest(); private static final Properties DEPLOYMENT_PROPERTIES = DeploymentProperties.get(); @@ -84,8 +89,9 @@ public class HTTPChannel extends AbstractChannel { ClientResponse response = Client.create(new DefaultClientConfig()) .resource(UriBuilder.fromUri(url).build()) - .header(REMOTE_USER, user).accept(accept) - .type(mimeType).method(httpMethod, ClientResponse.class, + .queryParam("user.name", user) + .accept(accept).type(mimeType) + .method(httpMethod, ClientResponse.class, (isPost(httpMethod) ? incomingRequest.getInputStream() : null)); incomingRequest.getInputStream().reset(); @@ -186,12 +192,4 @@ public class HTTPChannel extends AbstractChannel { } return consumes.value()[0]; } - - private String getProduces(Method method) { - Produces produces = method.getAnnotation(Produces.class); - if (produces.value() == null) { - return MediaType.WILDCARD; - } - return produces.value()[0]; - } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java b/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java index f172e82..b4b544c 100644 --- a/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java +++ b/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java @@ -18,82 +18,188 @@ package org.apache.falcon.security; +import org.apache.commons.lang.StringUtils; import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.security.authentication.server.AuthenticationFilter; import org.apache.log4j.Logger; import org.apache.log4j.NDC; -import javax.servlet.*; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.Response; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; +import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.UUID; /** * This enforces authentication as part of the filter before processing the request. + * Subclass of {@link AuthenticationFilter}. */ -public class BasicAuthFilter implements Filter { +public class BasicAuthFilter extends AuthenticationFilter { private static final Logger LOG = Logger.getLogger(BasicAuthFilter.class); - private static final String GUEST = "guest"; + /** + * Constant for the configuration property that indicates the prefix. + */ + private static final String FALCON_PREFIX = "falcon.http.authentication."; - private static final Set BLACK_LISTED_USER = new HashSet( - Arrays.asList(new String[]{"hdfs", "mapred", "oozie", "falcon"})); + /** + * Constant for the configuration property that indicates the blacklisted super users for falcon. + */ + private static final String BLACK_LISTED_USERS_KEY = FALCON_PREFIX + "blacklisted.users"; - private boolean isSecure; + /** + * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication + * before invoking the actual resource. + */ + private HttpServlet optionsServlet; + private Set blackListedUsers; + /** + * Initialize the filter. + * + * @param filterConfig filter configuration. + * @throws ServletException thrown if the filter could not be initialized. + */ @Override public void init(FilterConfig filterConfig) throws ServletException { - String secure = StartupProperties.get().getProperty("security.enabled", "true"); - this.isSecure = Boolean.parseBoolean(secure); + LOG.info("BasicAuthFilter initialization started"); + super.init(filterConfig); + + optionsServlet = new HttpServlet() {}; + optionsServlet.init(); + + initializeBlackListedUsers(); } + private void initializeBlackListedUsers() { + blackListedUsers = new HashSet(); + String blackListedUserConfig = StartupProperties.get().getProperty(BLACK_LISTED_USERS_KEY); + if (!StringUtils.isEmpty(blackListedUserConfig)) { + blackListedUsers.addAll(Arrays.asList(blackListedUserConfig.split(","))); + } + } + + /** + * Returns the configuration from Oozie configuration to be used by the authentication filter. + *

+ * All properties from Oozie configuration which name starts with {@link #FALCON_PREFIX} will + * be returned. The keys of the returned properties are trimmed from the {@link #FALCON_PREFIX} + * prefix, for example the Oozie configuration property name 'oozie.authentication.type' will + * be just 'type'. + * + * @param configPrefix configuration prefix, this parameter is ignored by this implementation. + * @param filterConfig filter configuration, this parameter is ignored by this implementation. + * @return all Oozie configuration properties prefixed with {@link #FALCON_PREFIX}, without the + * prefix. + */ @Override - public void doFilter(ServletRequest request, - ServletResponse response, - FilterChain chain) throws IOException, ServletException { + protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) { + Properties authProperties = new Properties(); + Properties configProperties = StartupProperties.get(); + + // setting the cookie path to root '/' so it is used for all resources. + authProperties.setProperty(AuthenticationFilter.COOKIE_PATH, "/"); - if (!(request instanceof HttpServletRequest) || !(response instanceof HttpServletResponse)) { - throw new IllegalStateException("Invalid request/response object"); + for (Map.Entry entry : configProperties.entrySet()) { + String name = (String) entry.getKey(); + if (name.startsWith(FALCON_PREFIX)) { + String value = (String) entry.getValue(); + name = name.substring(FALCON_PREFIX.length()); + authProperties.setProperty(name, value); + } } - HttpServletRequest httpRequest = (HttpServletRequest) request; - HttpServletResponse httpResponse = (HttpServletResponse) response; - String user; - String requestId = UUID.randomUUID().toString(); + return authProperties; + } - if (!isSecure) { - user = GUEST; - } else { - user = httpRequest.getHeader("Remote-User"); - } + @Override + public void doFilter(final ServletRequest request, final ServletResponse response, + final FilterChain filterChain) throws IOException, ServletException { + + FilterChain filterChainWrapper = new FilterChain() { + + @Override + public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse) + throws IOException, ServletException { + HttpServletRequest httpRequest = (HttpServletRequest) servletRequest; + + if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication + optionsServlet.service(request, response); + } else { + final String user = getUserFromRequest(httpRequest); + if (StringUtils.isEmpty(user)) { + ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(), + "User can't be empty"); + } else if (blackListedUsers.contains(user)) { + ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(), + "User can't be a superuser:" + BLACK_LISTED_USERS_KEY); + } else { + try { + String requestId = UUID.randomUUID().toString(); + NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo()); + NDC.push(requestId); + CurrentUser.authenticate(user); + LOG.info("Request from user: " + user + ", URL=" + getRequestUrl(httpRequest)); - if (user == null || user.isEmpty()) { - httpResponse.sendError(Response.Status.BAD_REQUEST.getStatusCode(), - "Remote user header can't be empty"); - } else if (BLACK_LISTED_USER.contains(user)) { - httpResponse.sendError(Response.Status.BAD_REQUEST.getStatusCode(), - "Remote user header can't be superusers:" + BLACK_LISTED_USER); - } else { - CurrentUser.authenticate(user); - try { - NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo()); - NDC.push(requestId); - LOG.info("Request from user: " + user + ", path=" + httpRequest.getPathInfo() - + ", query=" + httpRequest.getQueryString()); - chain.doFilter(request, response); - } finally { - NDC.pop(); - NDC.pop(); + filterChain.doFilter(servletRequest, servletResponse); + } finally { + NDC.pop(); + NDC.pop(); + } + } + } } - } + + private String getUserFromRequest(HttpServletRequest httpRequest) { + String user = httpRequest.getRemoteUser(); // this is available from wrapper in super class + if (!StringUtils.isEmpty(user)) { + return user; + } + + user = httpRequest.getParameter("user.name"); // available in query-param + if (!StringUtils.isEmpty(user)) { + return user; + } + + user = httpRequest.getHeader("Remote-User"); // backwards-compatibility + if (!StringUtils.isEmpty(user)) { + return user; + } + + return null; + } + + private String getRequestUrl(HttpServletRequest request) { + StringBuffer url = request.getRequestURL(); + if (request.getQueryString() != null) { + url.append("?").append(request.getQueryString()); + } + + return url.toString(); + } + }; + + super.doFilter(request, response, filterChainWrapper); } @Override public void destroy() { + if (optionsServlet != null) { + optionsServlet.destroy(); + } + + super.destroy(); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/main/java/org/apache/falcon/security/RemoteUserInHeaderBasedAuthenticationHandler.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/security/RemoteUserInHeaderBasedAuthenticationHandler.java b/prism/src/main/java/org/apache/falcon/security/RemoteUserInHeaderBasedAuthenticationHandler.java new file mode 100644 index 0000000..1d32e86 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/security/RemoteUserInHeaderBasedAuthenticationHandler.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.security; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.authentication.server.AuthenticationToken; +import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; + +/** + * This class is for backwards compatibility with clients who send Remote-User in the request + * header else delegates to PseudoAuthenticationHandler. + * + * This is a temporary solution until Falcon clients (0.3) are deprecated. + */ +public class RemoteUserInHeaderBasedAuthenticationHandler extends PseudoAuthenticationHandler { + + @Override + public AuthenticationToken authenticate(HttpServletRequest request, HttpServletResponse response) + throws IOException, AuthenticationException { + + String userName = request.getHeader("Remote-User"); + if (StringUtils.isEmpty(userName)) { + return super.authenticate(request, response); + } else { + return new AuthenticationToken(userName, userName, getType()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java index 6ac926d..7e2a6c1 100644 --- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java +++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java @@ -26,6 +26,7 @@ import org.apache.falcon.rerun.event.RerunEvent.RerunType; import org.apache.falcon.rerun.handler.AbstractRerunHandler; import org.apache.falcon.rerun.handler.RerunHandlerFactory; import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.security.CurrentUser; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.apache.log4j.Logger; @@ -40,13 +41,14 @@ import java.util.Date; public class FalconTopicSubscriber implements MessageListener, ExceptionListener { private static final Logger LOG = Logger.getLogger(FalconTopicSubscriber.class); - private TopicSubscriber subscriber; - private String implementation; - private String userName; - private String password; - private String url; - private String topicName; + private final String implementation; + private final String userName; + private final String password; + private final String url; + private final String topicName; + private Connection connection; + private TopicSubscriber subscriber; private AbstractRerunHandler retryHandler = RerunHandlerFactory.getRerunHandler(RerunType.RETRY); private AbstractRerunHandler latedataHandler = RerunHandlerFactory.getRerunHandler(RerunType.LATE); @@ -62,8 +64,7 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener public void startSubscriber() throws FalconException { try { - connection = createAndGetConnection(implementation, userName, - password, url); + connection = createAndGetConnection(implementation, userName, password, url); TopicSession session = (TopicSession) connection.createSession( false, Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic(topicName); @@ -72,8 +73,7 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener connection.setExceptionListener(this); connection.start(); } catch (Exception e) { - LOG.error("Error starting subscriber of topic: " + this.toString(), - e); + LOG.error("Error starting subscriber of topic: " + this.toString(), e); throw new FalconException(e); } } @@ -82,40 +82,41 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener public void onMessage(Message message) { MapMessage mapMessage = (MapMessage) message; try { - debug(mapMessage); + if (LOG.isDebugEnabled()) {debug(mapMessage); } String cluster = mapMessage.getString(ARG.cluster.getArgName()); String entityName = mapMessage.getString(ARG.entityName.getArgName()); String entityType = mapMessage.getString(ARG.entityType.getArgName()); String workflowId = mapMessage.getString(ARG.workflowId.getArgName()); + String workflowUser = mapMessage.getString(ARG.workflowUser.getArgName()); String runId = mapMessage.getString(ARG.runId.getArgName()); String nominalTime = mapMessage.getString(ARG.nominalTime.getArgName()); String status = mapMessage.getString(ARG.status.getArgName()); String operation = mapMessage.getString(ARG.operation.getArgName()); + CurrentUser.authenticate(workflowUser); AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); InstancesResult result = wfEngine.getJobDetails(cluster, workflowId); Date startTime = result.getInstances()[0].startTime; Date endTime = result.getInstances()[0].endTime; Long duration = (endTime.getTime() - startTime.getTime()) * 1000000; + if (status.equalsIgnoreCase("FAILED")) { retryHandler.handleRerun(cluster, entityType, entityName, - nominalTime, runId, workflowId, + nominalTime, runId, workflowId, workflowUser, System.currentTimeMillis()); GenericAlert.instrumentFailedInstance(cluster, entityType, - entityName, nominalTime, workflowId, runId, operation, - SchemaHelper.formatDateUTC(startTime), - "", "", duration); + entityName, nominalTime, workflowId, workflowUser, runId, operation, + SchemaHelper.formatDateUTC(startTime), "", "", duration); } else if (status.equalsIgnoreCase("SUCCEEDED")) { latedataHandler.handleRerun(cluster, entityType, entityName, - nominalTime, runId, workflowId, + nominalTime, runId, workflowId, workflowUser, System.currentTimeMillis()); GenericAlert.instrumentSucceededInstance(cluster, entityType, - entityName, nominalTime, workflowId, runId, operation, - SchemaHelper.formatDateUTC(startTime), - duration); + entityName, nominalTime, workflowId, workflowUser, runId, operation, + SchemaHelper.formatDateUTC(startTime), duration); notifySLAService(cluster, entityName, entityType, nominalTime, duration); } @@ -143,17 +144,14 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener } private void debug(MapMessage mapMessage) throws JMSException { - if (LOG.isDebugEnabled()) { - StringBuffer buff = new StringBuffer(); - buff.append("Received:{"); - for (ARG arg : ARG.values()) { - buff.append(arg.getArgName()).append('='). - append(mapMessage.getString(arg.getArgName())). - append(", "); - } - buff.append("}"); - LOG.debug(buff); + StringBuilder buff = new StringBuilder(); + buff.append("Received:{"); + for (ARG arg : ARG.values()) { + buff.append(arg.getArgName()).append('=') + .append(mapMessage.getString(arg.getArgName())).append(", "); } + buff.append("}"); + LOG.debug(buff); } @Override @@ -164,11 +162,14 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener public void closeSubscriber() throws FalconException { try { LOG.info("Closing subscriber on topic : " + this.topicName); - subscriber.close(); - connection.close(); + if (subscriber != null) { + subscriber.close(); + } + if (connection != null) { + connection.close(); + } } catch (JMSException e) { - LOG.error("Error closing subscriber of topic: " + this.toString(), - e); + LOG.error("Error closing subscriber of topic: " + this.toString(), e); throw new FalconException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java b/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java index 1cd7776..9ae1ad1 100644 --- a/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java +++ b/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java @@ -62,6 +62,8 @@ public class ProcessSubscriberService implements FalconService { @Override public void destroy() throws FalconException { - subscriber.closeSubscriber(); + if (subscriber != null) { // in case there was an exception while starting subscriber + subscriber.closeSubscriber(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java b/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java index 1db71fd..919f821 100644 --- a/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java +++ b/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java @@ -28,8 +28,8 @@ public class GenericAlertTest { @Test public void testWfInstanceFailedAlert() throws Exception { - GenericAlert.instrumentFailedInstance("cluster", "process", "agg-coord", "120:df", "ef-id", "1", - "DELETE", "now", "error", "none", 1242); + GenericAlert.instrumentFailedInstance("cluster", "process", "agg-coord", "120:df", + "ef-id", "wf-user", "1", "DELETE", "now", "error", "none", 1242); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3c51f105/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java b/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java index f1536f4..9b1d42a 100644 --- a/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FalconTopicSubscriberTest.java @@ -34,9 +34,7 @@ import javax.jms.*; */ public class FalconTopicSubscriberTest { - private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true"; - // private static final String BROKER_URL = - // "tcp://localhost:61616?daemon=true"; + private static final String BROKER_URL = "vm://localhost"; private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory"; private static final String TOPIC_NAME = "FALCON.ENTITY.TOPIC"; private static final String SECONDARY_TOPIC_NAME = "FALCON.ENTITY.SEC.TOPIC"; @@ -78,8 +76,7 @@ public class FalconTopicSubscriberTest { MapMessage mapMessage = session.createMapMessage(); message.getKeyValueMap().put(ARG.status, "FAILED"); for (ARG arg : ARG.values()) { - mapMessage.setString(arg.getPropName(), message - .getKeyValueMap().get(arg)); + mapMessage.setString(arg.getPropName(), message.getKeyValueMap().get(arg)); } producer.send(mapMessage); } @@ -101,6 +98,7 @@ public class FalconTopicSubscriberTest { message.getKeyValueMap().put(ARG.workflowId, "workflow-" + i); message.getKeyValueMap().put(ARG.topicName, TOPIC_NAME); message.getKeyValueMap().put(ARG.status, "SUCCEEDED"); + message.getKeyValueMap().put(ARG.workflowUser, "falcon"); return message; }