falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [5/9] FALCON-464 Enforce Authorization for REST API. Contributed by Venkatesh Seetharam
Date Fri, 08 Aug 2014 17:43:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 72669eb..52bb7ae 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -23,26 +23,18 @@ import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.GraphQuery;
 import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfaces;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.StartupProperties;
 import org.testng.Assert;
@@ -136,7 +128,8 @@ public class MetadataMappingServiceTest {
 
     @Test
     public void testOnAddClusterEntity() throws Exception {
-        clusterEntity = buildCluster(CLUSTER_ENTITY_NAME, COLO_NAME, "classification=production");
+        clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME, COLO_NAME,
+                "classification=production");
         configStore.publish(EntityType.CLUSTER, clusterEntity);
 
         verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY);
@@ -148,8 +141,9 @@ public class MetadataMappingServiceTest {
 
     @Test (dependsOnMethods = "testOnAddClusterEntity")
     public void testOnAddFeedEntity() throws Exception {
-        Feed impressionsFeed = buildFeed("impression-feed", clusterEntity, "classified-as=Secure", "analytics",
-                Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
+        Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity,
+                "classified-as=Secure", "analytics");
+        addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
         configStore.publish(EntityType.FEED, impressionsFeed);
         inputFeeds.add(impressionsFeed);
         verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
@@ -157,16 +151,18 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
         Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
 
-        Feed clicksFeed = buildFeed("clicks-feed", clusterEntity, "classified-as=Secure,classified-as=Financial",
-                "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
+        Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "analytics");
+        addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
         configStore.publish(EntityType.FEED, clicksFeed);
         inputFeeds.add(clicksFeed);
         verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY);
         Assert.assertEquals(getVerticesCount(service.getGraph()), 9); // feed and financial vertex
         Assert.assertEquals(getEdgesCount(service.getGraph()), 11); // +5 = cluster + user + 2Group + Tag
 
-        Feed join1Feed = buildFeed("imp-click-join1", clusterEntity, "classified-as=Financial", "reporting,bi",
-                Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi");
+        addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, join1Feed);
         outputFeeds.add(join1Feed);
         verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY);
@@ -174,8 +170,9 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getEdgesCount(service.getGraph()), 16); // +5 = cluster + user +
         // Group + 2Tags
 
-        Feed join2Feed = buildFeed("imp-click-join2", clusterEntity, "classified-as=Secure,classified-as=Financial",
-                "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+        Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi");
+        addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, join2Feed);
         outputFeeds.add(join2Feed);
         verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY);
@@ -187,15 +184,16 @@ public class MetadataMappingServiceTest {
 
     @Test (dependsOnMethods = "testOnAddFeedEntity")
     public void testOnAddProcessEntity() throws Exception {
-        processEntity = buildProcess(PROCESS_ENTITY_NAME, clusterEntity, "classified-as=Critical");
-        addWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+        processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical");
+        EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
 
         for (Feed inputFeed : inputFeeds) {
-            addInput(processEntity, inputFeed);
+            EntityBuilderTestUtil.addInput(processEntity, inputFeed);
         }
 
         for (Feed outputFeed : outputFeeds) {
-            addOutput(processEntity, outputFeed);
+            EntityBuilderTestUtil.addOutput(processEntity, outputFeed);
         }
 
         configStore.publish(EntityType.PROCESS, processEntity);
@@ -241,7 +239,8 @@ public class MetadataMappingServiceTest {
         service.init();
 
         // cannot modify cluster, adding a new cluster
-        bcpCluster = buildCluster("bcp-cluster", "east-coast", "classification=bcp");
+        bcpCluster = EntityBuilderTestUtil.buildCluster("bcp-cluster", "east-coast",
+                "classification=bcp");
         configStore.publish(EntityType.CLUSTER, bcpCluster);
         verifyEntityWasAddedToGraph("bcp-cluster", RelationshipType.CLUSTER_ENTITY);
 
@@ -253,9 +252,9 @@ public class MetadataMappingServiceTest {
     @Test(dependsOnMethods = "testOnChange")
     public void testOnFeedEntityChange() throws Exception {
         Feed oldFeed = inputFeeds.get(0);
-        Feed newFeed = buildFeed(oldFeed.getName(), clusterEntity,
-                "classified-as=Secured,source=data-warehouse", "reporting",
-                Storage.TYPE.FILESYSTEM, "jail://global:00/falcon/impression-feed/20140101");
+        Feed newFeed = EntityBuilderTestUtil.buildFeed(oldFeed.getName(), clusterEntity,
+                "classified-as=Secured,source=data-warehouse", "reporting");
+        addStorage(newFeed, Storage.TYPE.FILESYSTEM, "jail://global:00/falcon/impression-feed/20140101");
 
         try {
             configStore.initiateUpdate(newFeed);
@@ -301,9 +300,10 @@ public class MetadataMappingServiceTest {
     @Test(dependsOnMethods = "testOnFeedEntityChange")
     public void testOnProcessEntityChange() throws Exception {
         Process oldProcess = processEntity;
-        Process newProcess = buildProcess(oldProcess.getName(), bcpCluster, null);
-        addWorkflow(newProcess, WORKFLOW_NAME, "2.0.0");
-        addInput(newProcess, inputFeeds.get(0));
+        Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), bcpCluster,
+                null);
+        EntityBuilderTestUtil.addProcessWorkflow(newProcess, WORKFLOW_NAME, "2.0.0");
+        EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0));
 
         try {
             configStore.initiateUpdate(newProcess);
@@ -348,44 +348,6 @@ public class MetadataMappingServiceTest {
         }
     }
 
-    private static Cluster buildCluster(String name, String colo, String tags) {
-        Cluster cluster = new Cluster();
-        cluster.setName(name);
-        cluster.setColo(colo);
-        cluster.setTags(tags);
-
-        Interfaces interfaces = new Interfaces();
-        cluster.setInterfaces(interfaces);
-
-        Interface storage = new Interface();
-        storage.setEndpoint("jail://global:00");
-        storage.setType(Interfacetype.WRITE);
-        cluster.getInterfaces().getInterfaces().add(storage);
-
-        return cluster;
-    }
-
-    private static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups,
-                                  Storage.TYPE storageType, String uriTemplate) {
-        Feed feed = new Feed();
-        feed.setName(feedName);
-        feed.setTags(tags);
-        feed.setGroups(groups);
-        feed.setFrequency(Frequency.fromString("hours(1)"));
-
-        org.apache.falcon.entity.v0.feed.Clusters
-                clusters = new org.apache.falcon.entity.v0.feed.Clusters();
-        feed.setClusters(clusters);
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
-                new org.apache.falcon.entity.v0.feed.Cluster();
-        feedCluster.setName(cluster.getName());
-        clusters.getClusters().add(feedCluster);
-
-        addStorage(feed, storageType, uriTemplate);
-
-        return feed;
-    }
-
     private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
         if (storageType == Storage.TYPE.FILESYSTEM) {
             Locations locations = new Locations();
@@ -402,53 +364,6 @@ public class MetadataMappingServiceTest {
         }
     }
 
-    private static Process buildProcess(String processName, Cluster cluster,
-                                        String tags) throws Exception {
-        Process processEntity = new Process();
-        processEntity.setName(processName);
-        processEntity.setTags(tags);
-
-        org.apache.falcon.entity.v0.process.Cluster processCluster =
-                new org.apache.falcon.entity.v0.process.Cluster();
-        processCluster.setName(cluster.getName());
-        processEntity.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
-        processEntity.getClusters().getClusters().add(processCluster);
-
-        return processEntity;
-    }
-
-    private static void addWorkflow(Process process, String workflowName, String version) {
-        Workflow workflow = new Workflow();
-        workflow.setName(workflowName);
-        workflow.setVersion(version);
-        workflow.setEngine(EngineType.PIG);
-        workflow.setPath("/falcon/test/workflow");
-
-        process.setWorkflow(workflow);
-    }
-
-    private static void addInput(Process process, Feed feed) {
-        if (process.getInputs() == null) {
-            process.setInputs(new Inputs());
-        }
-
-        Inputs inputs = process.getInputs();
-        Input input = new Input();
-        input.setFeed(feed.getName());
-        inputs.getInputs().add(input);
-    }
-
-    private static void addOutput(Process process, Feed feed) {
-        if (process.getOutputs() == null) {
-            process.setOutputs(new Outputs());
-        }
-
-        Outputs outputs = process.getOutputs();
-        Output output = new Output();
-        output.setFeed(feed.getName());
-        outputs.getOutputs().add(output);
-    }
-
     private void verifyEntityWasAddedToGraph(String entityName, RelationshipType entityType) {
         Vertex entityVertex = getEntityVertex(entityName, entityType);
         Assert.assertNotNull(entityVertex);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
index fe7155b..a1861e1 100644
--- a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
+++ b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.security;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -32,4 +33,12 @@ public class CurrentUserTest {
         CurrentUser.authenticate(id);
         Assert.assertEquals(CurrentUser.getUser(), id);
     }
+
+    @Test
+    public void testGetProxyUser() throws Exception {
+        CurrentUser.authenticate("proxy");
+        UserGroupInformation proxyUgi = CurrentUser.getProxyUgi();
+        Assert.assertNotNull(proxyUgi);
+        Assert.assertEquals(proxyUgi.getUserName(), "proxy");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
new file mode 100644
index 0000000..0fd869e
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/security/DefaultAuthorizationProviderTest.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.Collection;
+
+/**
+ * Unit tests for DefaultAuthorizationProvider.
+ */
+public class DefaultAuthorizationProviderTest {
+
+    public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+    public static final String PROCESS_ENTITY_NAME = "sample-process";
+
+    private UserGroupInformation realUser;
+    private ConfigurationStore configStore;
+    private Cluster clusterEntity;
+    private Feed feedEntity;
+    private org.apache.falcon.entity.v0.process.Process processEntity;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        realUser = UserGroupInformation.createUserForTesting("falcon", new String[]{"falcon", });
+
+        CurrentUser.authenticate(EntityBuilderTestUtil.USER);
+        org.testng.Assert.assertEquals(CurrentUser.getUser(), EntityBuilderTestUtil.USER);
+
+        configStore = ConfigurationStore.get();
+
+        addClusterEntity();
+        addFeedEntity();
+        addProcessEntity();
+        org.testng.Assert.assertNotNull(processEntity);
+    }
+
+    public void addClusterEntity() throws Exception {
+        clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME);
+        configStore.publish(EntityType.CLUSTER, clusterEntity);
+    }
+
+    public void addFeedEntity() throws Exception {
+        feedEntity = EntityBuilderTestUtil.buildFeed("sample-feed", clusterEntity,
+                "classified-as=Secure", "analytics");
+        addStorage(feedEntity, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
+        configStore.publish(EntityType.FEED, feedEntity);
+    }
+
+    private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            Locations locations = new Locations();
+            feed.setLocations(locations);
+
+            Location location = new Location();
+            location.setType(LocationType.DATA);
+            location.setPath(uriTemplate);
+            feed.getLocations().getLocations().add(location);
+        } else {
+            CatalogTable table = new CatalogTable();
+            table.setUri(uriTemplate);
+            feed.setTable(table);
+        }
+    }
+
+    public void addProcessEntity() throws Exception {
+        processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME,
+                clusterEntity, "classified-as=Critical");
+        EntityBuilderTestUtil.addProcessWorkflow(processEntity);
+        EntityBuilderTestUtil.addProcessACL(processEntity);
+
+        configStore.publish(EntityType.PROCESS, processEntity);
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        cleanupStore();
+    }
+
+    protected void cleanupStore() throws FalconException {
+        configStore = ConfigurationStore.get();
+        for (EntityType type : EntityType.values()) {
+            Collection<String> entities = configStore.getEntities(type);
+            for (String entity : entities) {
+                configStore.remove(type, entity);
+            }
+        }
+    }
+
+    @DataProvider(name = "adminResourceActions")
+    private Object[][] createAdminResourceActions() {
+        return new Object[][] {
+            {"version"},
+            {"stack"},
+            {"config"},
+        };
+    }
+
+    @Test (dataProvider = "adminResourceActions")
+    public void testAuthorizeAdminResourceAdmin(String action) throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("admin", action, null, null, proxyUgi);
+    }
+
+    @Test
+    public void testAuthorizeAdminResourceAdminUserBadGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("admin", "version", null, null, proxyUgi);
+    }
+
+    @Test
+    public void testAuthorizeAdminResourceAdminGroupBadUser() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty(
+                "falcon.security.authorization.admin.groups", "admin-group");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin-user", realUser, new String[]{"admin-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("admin", "version", null, null, proxyUgi);
+    }
+
+    @Test (expectedExceptions = AuthorizationException.class)
+    public void testAuthorizeAdminResourceInvalidUserAndGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin-user", realUser, new String[]{"admin-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("admin", "version", null, null, proxyUgi);
+        Assert.fail("User does not belong to both admin-users not groups");
+    }
+
+    @DataProvider(name = "entityResourceActions")
+    private Object[][] createEntityResourceActions() {
+        return new Object[][] {
+            {"entities", "list", "feed"},
+            {"entities", "list", "process"},
+            {"entities", "list", "cluster"},
+        };
+    }
+
+    @Test (dataProvider = "entityResourceActions")
+    public void testAuthorizeEntitiesInstancesReadOnlyResource(String resource,
+                                                               String action,
+                                                               String entityType) throws Exception {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin-user", realUser, new String[]{"admin-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource(resource, action, entityType, null, proxyUgi);
+    }
+
+    @DataProvider(name = "entityLifecycleResourceActions")
+    private Object[][] createEntityLifecycleResourceActions() {
+        return new Object[][] {
+            {"entities", "status", "cluster", "primary-cluster"},
+            {"entities", "status", "process", "sample-process"},
+            {"entities", "status", "feed", "sample-feed"},
+            {"instance", "status", "process", "sample-process"},
+            {"instance", "running", "process", "sample-process"},
+            {"instance", "running", "feed", "sample-feed"},
+        };
+    }
+
+    @Test(dataProvider = "entityLifecycleResourceActions")
+    public void testAuthorizeEntitiesInstancesLifecycleResource(String resource, String action,
+                                                                String entityType,
+                                                                String entityName) throws Exception {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                EntityBuilderTestUtil.USER, realUser, new String[]{EntityBuilderTestUtil.USER, });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource(resource, action, entityType, entityName, proxyUgi);
+    }
+
+    @Test(dataProvider = "entityLifecycleResourceActions",
+            expectedExceptions = AuthorizationException.class)
+    public void testAuthorizeEntitiesInstancesLifecycleResourceBadUGI(String resource,
+                                                                      String action,
+                                                                      String entityType,
+                                                                      String entityName)
+        throws Exception {
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin-user", realUser, new String[]{"admin-group", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource(resource, action, entityType, entityName, proxyUgi);
+    }
+
+    @Test (expectedExceptions = AuthorizationException.class)
+    public void testAuthorizeBadResource() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("invalid", "version", null, null, proxyUgi);
+        Assert.fail("Bad resource");
+    }
+
+    @Test (expectedExceptions = IllegalArgumentException.class)
+    public void testAuthorizeNullResource() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource(null, "version", null, null, proxyUgi);
+        Assert.fail("Bad resource");
+    }
+
+    @Test (expectedExceptions = IllegalArgumentException.class)
+    public void testAuthorizeBadAction() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("entities", null, "feedz", null, proxyUgi);
+        Assert.fail("Bad action");
+    }
+
+    @Test (expectedExceptions = IllegalArgumentException.class)
+    public void testAuthorizeNullEntityType() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("entities", "list", null, "primary-cluster", proxyUgi);
+        Assert.fail("Bad entity type");
+    }
+
+    @Test (expectedExceptions = IllegalArgumentException.class)
+    public void testAuthorizeBadEntityType() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("entities", "list", "clusterz", "primary-cluster", proxyUgi);
+        Assert.fail("Bad entity type");
+    }
+
+    @Test
+    public void testAuthorizeValidatePOSTOperations() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        EntityBuilderTestUtil.addProcessACL(processEntity, "admin", "admin");
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeEntity(processEntity.getName(), "process",
+                processEntity.getACL(), "submit", proxyUgi);
+    }
+
+    @Test (expectedExceptions = AuthorizationException.class)
+    public void testAuthorizeResourceOperationsBadEntity() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("entities", "status", "feed", processEntity.getName(), proxyUgi);
+        Assert.fail("Bad entity");
+    }
+
+    @Test
+    public void testAuthorizeValidatePOSTOperationsGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        EntityBuilderTestUtil.addProcessACL(processEntity, "admin-user", "admin");
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeEntity(processEntity.getName(), "process",
+                processEntity.getACL(), "submit", proxyUgi);
+    }
+
+    @Test (expectedExceptions = AuthorizationException.class)
+    public void testAuthorizeValidatePOSTOperationsBadUserAndGroup() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        EntityBuilderTestUtil.addProcessACL(processEntity, "admin-user", "admin-group");
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeEntity(processEntity.getName(), "process",
+                processEntity.getACL(), "submit", proxyUgi);
+    }
+
+    @Test
+    public void testAuthorizeLineageResource() throws Exception {
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.users", "admin");
+        StartupProperties.get().setProperty("falcon.security.authorization.admin.groups", "admin");
+
+        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUserForTesting(
+                "admin", realUser, new String[]{"admin", });
+
+        DefaultAuthorizationProvider provider = new DefaultAuthorizationProvider();
+        provider.authorizeResource("lineage", "vertices", null, null, proxyUgi);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java b/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
index 630aa4b..a36d916 100644
--- a/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/security/SecurityUtilTest.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.security;
 
 import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -61,9 +60,25 @@ public class SecurityUtilTest {
     }
 
     @Test
-    public void testGetProxyUser() throws Exception {
-        UserGroupInformation proxyUgi = SecurityUtil.getProxyUser("proxy");
-        Assert.assertNotNull(proxyUgi);
-        Assert.assertEquals(proxyUgi.getUserName(), "proxy");
+    public void testIsAuthorizationEnabledByDefault() throws Exception {
+        Assert.assertFalse(SecurityUtil.isAuthorizationEnabled());
+    }
+
+    @Test
+    public void testIsAuthorizationEnabled() throws Exception {
+        try {
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+            Assert.assertTrue(SecurityUtil.isAuthorizationEnabled());
+        } finally {
+            // reset
+            StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+        }
+    }
+
+    @Test
+    public void testGetAuthorizationProviderByDefault() throws Exception {
+        Assert.assertNotNull(SecurityUtil.getAuthorizationProvider());
+        Assert.assertEquals(SecurityUtil.getAuthorizationProvider().getClass(),
+                DefaultAuthorizationProvider.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/FalconWebException.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/FalconWebException.java b/prism/src/main/java/org/apache/falcon/FalconWebException.java
index 736a8b5..251a1c1 100644
--- a/prism/src/main/java/org/apache/falcon/FalconWebException.java
+++ b/prism/src/main/java/org/apache/falcon/FalconWebException.java
@@ -21,6 +21,7 @@ package org.apache.falcon;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,8 +38,13 @@ public class FalconWebException extends WebApplicationException {
 
     private static final Logger LOG = LoggerFactory.getLogger(FalconWebException.class);
 
-    public static FalconWebException newException(Throwable e, Response.Status status) {
-        return newException(getMessage(e), status);
+    public static FalconWebException newException(Throwable e,
+                                                  Response.Status status) {
+        if (e instanceof AuthorizationException) {
+            status = Response.Status.FORBIDDEN;
+        }
+
+        return newException(e.getMessage(), status);
     }
 
     public static FalconWebException newInstanceException(Throwable e, Response.Status status) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 3d9078e..c775510 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -460,8 +460,6 @@ public abstract class AbstractEntityManager {
      *
      * @param type entity type
      * @param fieldStr fields that the query is interested in, separated by comma
-     *
-     * @param type entity type
      * @return String
      */
     public EntityList getEntityList(String type, String fieldStr) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index f98aece..f5329ef 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.UnschedulableEntityException;
 import org.apache.falcon.monitors.Dimension;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,8 +45,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
     /**
      * Schedules an submitted entity immediately.
      *
-     * @param type
-     * @param entity
+     * @param type   entity type
+     * @param entity entity name
      * @return APIResult
      */
     public APIResult schedule(
@@ -63,7 +64,9 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
         }
     }
 
-    private synchronized void scheduleInternal(String type, String entity) throws FalconException {
+    private synchronized void scheduleInternal(String type, String entity)
+        throws FalconException, AuthorizationException {
+
         checkSchedulableEntity(type);
         Entity entityObj = EntityUtil.getEntity(type, entity);
         getWorkflowEngine().schedule(entityObj);
@@ -72,8 +75,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
     /**
      * Submits a new entity and schedules it immediately.
      *
-     * @param type
-     * @return
+     * @param type   entity type
+     * @return APIResult
      */
     public APIResult submitAndSchedule(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
@@ -95,8 +98,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
     /**
      * Suspends a running entity.
      *
-     * @param type
-     * @param entity
+     * @param type   entity type
+     * @param entity entity name
      * @return APIResult
      */
     public APIResult suspend(
@@ -123,8 +126,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
     /**
      * Resumes a suspended entity.
      *
-     * @param type
-     * @param entity
+     * @param type   entity type
+     * @param entity entity name
      * @return APIResult
      */
     public APIResult resume(

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java b/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
deleted file mode 100644
index 5a56b9a..0000000
--- a/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
-import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
-import org.apache.log4j.NDC;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.FilterChain;
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-
-/**
- * This enforces authentication as part of the filter before processing the request.
- * Subclass of {@link AuthenticationFilter}.
- */
-public class BasicAuthFilter extends AuthenticationFilter {
-
-    private static final Logger LOG = LoggerFactory.getLogger(BasicAuthFilter.class);
-
-    /**
-     * Constant for the configuration property that indicates the prefix.
-     */
-    protected static final String FALCON_PREFIX = "falcon.http.authentication.";
-    protected static final String KERBEROS_PRINCIPAL = FALCON_PREFIX + KerberosAuthenticationHandler.PRINCIPAL;
-
-    /**
-     * Constant for the configuration property that indicates the blacklisted super users for falcon.
-     */
-    private static final String BLACK_LISTED_USERS_KEY = FALCON_PREFIX + "blacklisted.users";
-
-    /**
-     * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
-     * before invoking the actual resource.
-     */
-    private HttpServlet optionsServlet;
-    private Set<String> blackListedUsers;
-
-    /**
-     * Initialize the filter.
-     *
-     * @param filterConfig filter configuration.
-     * @throws ServletException thrown if the filter could not be initialized.
-     */
-    @Override
-    public void init(FilterConfig filterConfig) throws ServletException {
-        LOG.info("BasicAuthFilter initialization started");
-        super.init(filterConfig);
-
-        optionsServlet = new HttpServlet() {};
-        optionsServlet.init();
-
-        initializeBlackListedUsers();
-    }
-
-    private void initializeBlackListedUsers() {
-        blackListedUsers = new HashSet<String>();
-        String blackListedUserConfig = StartupProperties.get().getProperty(BLACK_LISTED_USERS_KEY);
-        if (!StringUtils.isEmpty(blackListedUserConfig)) {
-            blackListedUsers.addAll(Arrays.asList(blackListedUserConfig.split(",")));
-        }
-    }
-
-    /**
-     * Returns the configuration from Oozie configuration to be used by the authentication filter.
-     * <p/>
-     * All properties from Oozie configuration which name starts with {@link #FALCON_PREFIX} will
-     * be returned. The keys of the returned properties are trimmed from the {@link #FALCON_PREFIX}
-     * prefix, for example the Oozie configuration property name 'oozie.authentication.type' will
-     * be just 'type'.
-     *
-     * @param configPrefix configuration prefix, this parameter is ignored by this implementation.
-     * @param filterConfig filter configuration, this parameter is ignored by this implementation.
-     * @return all Oozie configuration properties prefixed with {@link #FALCON_PREFIX}, without the
-     * prefix.
-     */
-    @Override
-    protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) {
-        Properties authProperties = new Properties();
-        Properties configProperties = StartupProperties.get();
-
-        // setting the cookie path to root '/' so it is used for all resources.
-        authProperties.setProperty(AuthenticationFilter.COOKIE_PATH, "/");
-
-        for (Map.Entry entry : configProperties.entrySet()) {
-            String name = (String) entry.getKey();
-            if (name.startsWith(FALCON_PREFIX)) {
-                String value = (String) entry.getValue();
-                name = name.substring(FALCON_PREFIX.length());
-                authProperties.setProperty(name, value);
-            }
-        }
-
-        if (UserGroupInformation.isSecurityEnabled()) { // replace _HOST in principal
-            String principal = getKerberosPrincipalWithSubstitutedHost(configProperties);
-            // principal cannot be null in secure mode, is validated in submission
-            authProperties.setProperty(KerberosAuthenticationHandler.PRINCIPAL, principal);
-        }
-
-        return authProperties;
-    }
-
-    /**
-     * Replaces _HOST in the principal with the actual hostname.
-     *
-     * @param configProperties Falcon config properties
-     * @return principal with _HOST substituted
-     */
-    private String getKerberosPrincipalWithSubstitutedHost(Properties configProperties) {
-        String principal = configProperties.getProperty(KERBEROS_PRINCIPAL);
-        try {
-            principal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(
-                    principal, SecurityUtil.getLocalHostName());
-        } catch (IOException ignored) {
-            // do nothing
-        }
-
-        return principal;
-    }
-
-    @Override
-    public void doFilter(final ServletRequest request, final ServletResponse response,
-                         final FilterChain filterChain) throws IOException, ServletException {
-
-        FilterChain filterChainWrapper = new FilterChain() {
-
-            @Override
-            public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
-                throws IOException, ServletException {
-                HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
-
-                if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
-                    optionsServlet.service(request, response);
-                } else {
-                    final String user = getUserFromRequest(httpRequest);
-                    if (StringUtils.isEmpty(user)) {
-                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
-                                "User can't be empty");
-                    } else if (blackListedUsers.contains(user)) {
-                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
-                                "User can't be a superuser:" + BLACK_LISTED_USERS_KEY);
-                    } else {
-                        try {
-                            String requestId = UUID.randomUUID().toString();
-                            NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo());
-                            NDC.push(requestId);
-                            CurrentUser.authenticate(user);
-                            LOG.info("Request from user: {}, URL={}", user, getRequestUrl(httpRequest));
-
-                            filterChain.doFilter(servletRequest, servletResponse);
-                        } finally {
-                            NDC.pop();
-                            NDC.pop();
-                        }
-                    }
-                }
-            }
-
-            private String getUserFromRequest(HttpServletRequest httpRequest) {
-                String user = httpRequest.getRemoteUser(); // this is available from wrapper in super class
-                if (!StringUtils.isEmpty(user)) {
-                    return user;
-                }
-
-                user = httpRequest.getParameter("user.name"); // available in query-param
-                if (!StringUtils.isEmpty(user)) {
-                    return user;
-                }
-
-                user = httpRequest.getHeader("Remote-User"); // backwards-compatibility
-                if (!StringUtils.isEmpty(user)) {
-                    return user;
-                }
-
-                return null;
-            }
-
-            private String getRequestUrl(HttpServletRequest request) {
-                StringBuffer url = request.getRequestURL();
-                if (request.getQueryString() != null) {
-                    url.append("?").append(request.getQueryString());
-                }
-
-                return url.toString();
-            }
-        };
-
-        super.doFilter(request, response, filterChainWrapper);
-    }
-
-    @Override
-    public void destroy() {
-        if (optionsServlet != null) {
-            optionsServlet.destroy();
-        }
-
-        super.destroy();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java
new file mode 100644
index 0000000..54023d1
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthenticationFilter.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.log4j.NDC;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * This enforces authentication as part of the filter before processing the request.
+ * Subclass of {@link org.apache.hadoop.security.authentication.server.AuthenticationFilter}.
+ */
+public class FalconAuthenticationFilter
+        extends org.apache.hadoop.security.authentication.server.AuthenticationFilter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconAuthenticationFilter.class);
+
+    /**
+     * Constant for the configuration property that indicates the prefix.
+     */
+    protected static final String FALCON_PREFIX = "falcon.http.authentication.";
+    protected static final String KERBEROS_PRINCIPAL = FALCON_PREFIX + KerberosAuthenticationHandler.PRINCIPAL;
+
+    /**
+     * Constant for the configuration property that indicates the blacklisted super users for falcon.
+     */
+    private static final String BLACK_LISTED_USERS_KEY = FALCON_PREFIX + "blacklisted.users";
+
+    /**
+     * An options servlet is used to authenticate users. OPTIONS method is used for triggering authentication
+     * before invoking the actual resource.
+     */
+    private HttpServlet optionsServlet;
+    private Set<String> blackListedUsers;
+
+    /**
+     * Initialize the filter.
+     *
+     * @param filterConfig filter configuration.
+     * @throws ServletException thrown if the filter could not be initialized.
+     */
+    @Override
+    public void init(FilterConfig filterConfig) throws ServletException {
+        LOG.info("FalconAuthenticationFilter initialization started");
+        super.init(filterConfig);
+
+        optionsServlet = new HttpServlet() {};
+        optionsServlet.init();
+
+        initializeBlackListedUsers();
+    }
+
+    private void initializeBlackListedUsers() {
+        blackListedUsers = new HashSet<String>();
+        String blackListedUserConfig = StartupProperties.get().getProperty(BLACK_LISTED_USERS_KEY);
+        if (!StringUtils.isEmpty(blackListedUserConfig)) {
+            blackListedUsers.addAll(Arrays.asList(blackListedUserConfig.split(",")));
+        }
+    }
+
+    /**
+     * Returns the configuration from Oozie configuration to be used by the authentication filter.
+     * <p/>
+     * All properties from Oozie configuration which name starts with {@link #FALCON_PREFIX} will
+     * be returned. The keys of the returned properties are trimmed from the {@link #FALCON_PREFIX}
+     * prefix, for example the Oozie configuration property name 'oozie.authentication.type' will
+     * be just 'type'.
+     *
+     * @param configPrefix configuration prefix, this parameter is ignored by this implementation.
+     * @param filterConfig filter configuration, this parameter is ignored by this implementation.
+     * @return all Oozie configuration properties prefixed with {@link #FALCON_PREFIX}, without the
+     * prefix.
+     */
+    @Override
+    protected Properties getConfiguration(String configPrefix, FilterConfig filterConfig) {
+        Properties authProperties = new Properties();
+        Properties configProperties = StartupProperties.get();
+
+        // setting the cookie path to root '/' so it is used for all resources.
+        authProperties.setProperty(
+                org.apache.hadoop.security.authentication.server.AuthenticationFilter.COOKIE_PATH, "/");
+
+        for (Map.Entry entry : configProperties.entrySet()) {
+            String name = (String) entry.getKey();
+            if (name.startsWith(FALCON_PREFIX)) {
+                String value = (String) entry.getValue();
+                name = name.substring(FALCON_PREFIX.length());
+                authProperties.setProperty(name, value);
+            }
+        }
+
+        if (UserGroupInformation.isSecurityEnabled()) { // replace _HOST in principal
+            String principal = getKerberosPrincipalWithSubstitutedHost(configProperties);
+            // principal cannot be null in secure mode, is validated in submission
+            authProperties.setProperty(KerberosAuthenticationHandler.PRINCIPAL, principal);
+        }
+
+        return authProperties;
+    }
+
+    /**
+     * Replaces _HOST in the principal with the actual hostname.
+     *
+     * @param configProperties Falcon config properties
+     * @return principal with _HOST substituted
+     */
+    private String getKerberosPrincipalWithSubstitutedHost(Properties configProperties) {
+        String principal = configProperties.getProperty(KERBEROS_PRINCIPAL);
+        try {
+            principal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(
+                    principal, SecurityUtil.getLocalHostName());
+        } catch (IOException ignored) {
+            // do nothing
+        }
+
+        return principal;
+    }
+
+    @Override
+    public void doFilter(final ServletRequest request, final ServletResponse response,
+                         final FilterChain filterChain) throws IOException, ServletException {
+
+        FilterChain filterChainWrapper = new FilterChain() {
+
+            @Override
+            public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
+                throws IOException, ServletException {
+                HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
+
+                if (httpRequest.getMethod().equals("OPTIONS")) { // option request meant only for authentication
+                    optionsServlet.service(request, response);
+                } else {
+                    final String user = getUserFromRequest(httpRequest);
+                    if (StringUtils.isEmpty(user)) {
+                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+                                "User can't be empty");
+                    } else if (blackListedUsers.contains(user)) {
+                        ((HttpServletResponse) response).sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+                                "User can't be a superuser:" + BLACK_LISTED_USERS_KEY);
+                    } else {
+                        try {
+                            String requestId = UUID.randomUUID().toString();
+                            NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo());
+                            NDC.push(requestId);
+                            CurrentUser.authenticate(user);
+                            LOG.info("Request from user: {}, URL={}", user, getRequestUrl(httpRequest));
+
+                            filterChain.doFilter(servletRequest, servletResponse);
+                        } finally {
+                            NDC.pop();
+                            NDC.pop();
+                        }
+                    }
+                }
+            }
+
+            private String getUserFromRequest(HttpServletRequest httpRequest) {
+                String user = httpRequest.getRemoteUser(); // this is available from wrapper in super class
+                if (!StringUtils.isEmpty(user)) {
+                    return user;
+                }
+
+                user = httpRequest.getParameter("user.name"); // available in query-param
+                if (!StringUtils.isEmpty(user)) {
+                    return user;
+                }
+
+                user = httpRequest.getHeader("Remote-User"); // backwards-compatibility
+                if (!StringUtils.isEmpty(user)) {
+                    return user;
+                }
+
+                return null;
+            }
+
+            private String getRequestUrl(HttpServletRequest request) {
+                StringBuffer url = request.getRequestURL();
+                if (request.getQueryString() != null) {
+                    url.append("?").append(request.getQueryString());
+                }
+
+                return url.toString();
+            }
+        };
+
+        super.doFilter(request, response, filterChainWrapper);
+    }
+
+    @Override
+    public void destroy() {
+        if (optionsServlet != null) {
+            optionsServlet.destroy();
+        }
+
+        super.destroy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
new file mode 100644
index 0000000..ceb13d0
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/security/FalconAuthorizationFilter.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.FalconException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This enforces authorization as part of the filter before processing the request.
+ */
+public class FalconAuthorizationFilter implements Filter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconAuthorizationFilter.class);
+
+    private boolean isAuthorizationEnabled;
+    private AuthorizationProvider authorizationProvider;
+
+    @Override
+    public void init(FilterConfig filterConfig) throws ServletException {
+        try {
+            isAuthorizationEnabled = SecurityUtil.isAuthorizationEnabled();
+            if (isAuthorizationEnabled) {
+                LOG.info("Falcon is running with authorization enabled");
+            }
+
+            authorizationProvider = SecurityUtil.getAuthorizationProvider();
+        } catch (FalconException e) {
+            throw new ServletException(e);
+        }
+    }
+
+    @Override
+    public void doFilter(ServletRequest request,
+                         ServletResponse response,
+                         FilterChain filterChain) throws IOException, ServletException {
+        HttpServletRequest httpRequest = (HttpServletRequest) request;
+
+        String pathInfo = httpRequest.getPathInfo();
+        String[] paths = getResourcesAndActions(pathInfo);
+        final String resource = paths[0];
+        final String action = paths[1];
+        final String entityType = paths.length > 2 ? paths[2] : null;
+        final String entityName = paths.length > 3 ? paths[3] : null;
+
+        if (isAuthorizationEnabled) {
+            LOG.info("Authorizing user={} against resource={}, action={}, entity name={}, "
+                + "entity type={}", CurrentUser.getUser(), resource, action, entityName, entityType);
+            authorizationProvider.authorizeResource(resource, action,
+                    entityType, entityName, CurrentUser.getProxyUgi());
+        }
+
+        filterChain.doFilter(request, response);
+    }
+
+    private static String[] getResourcesAndActions(String pathInfo) {
+        List<String> splits = new ArrayList<String>();
+        final String[] pathSplits = pathInfo.substring(1).split("/");
+        final String resource = pathSplits[0];
+        if (resource.equals("graphs")) {
+            splits.add(pathSplits[1]);  // resource
+            splits.add(pathSplits[2]);  // action
+        } else {
+            splits.add(pathSplits[0]);  // resource
+            splits.add(pathSplits[1]);  // action
+            if (pathSplits.length > 2) {  // entity type
+                splits.add(pathSplits[2]);
+            }
+            if (pathSplits.length > 3) {  // entity name
+                splits.add(pathSplits[3]);
+            }
+        }
+
+        return splits.toArray(new String[splits.size()]);
+    }
+
+    @Override
+    public void destroy() {
+        authorizationProvider = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/prism/src/main/webapp/WEB-INF/web.xml b/prism/src/main/webapp/WEB-INF/web.xml
index e7c6234..ba139aa 100644
--- a/prism/src/main/webapp/WEB-INF/web.xml
+++ b/prism/src/main/webapp/WEB-INF/web.xml
@@ -26,12 +26,22 @@
     <description>Apache Falcon Prism</description>
 
     <filter>
-        <filter-name>auth</filter-name>
-        <filter-class>org.apache.falcon.security.BasicAuthFilter</filter-class>
+        <filter-name>authentication</filter-name>
+        <filter-class>org.apache.falcon.security.FalconAuthenticationFilter</filter-class>
     </filter>
 
+    <filter>
+        <filter-name>authorization</filter-name>
+        <filter-class>org.apache.falcon.security.FalconAuthorizationFilter</filter-class>
+    </filter>
+
+    <filter-mapping>
+        <filter-name>authentication</filter-name>
+        <servlet-name>FalconProxyAPI</servlet-name>
+    </filter-mapping>
+
     <filter-mapping>
-        <filter-name>auth</filter-name>
+        <filter-name>authorization</filter-name>
         <servlet-name>FalconProxyAPI</servlet-name>
     </filter-mapping>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index 26c24c7..d664782 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -22,27 +22,18 @@ import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfaces;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
-import org.apache.falcon.entity.v0.process.Clusters;
 import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.metadata.LineageArgs;
 import org.apache.falcon.metadata.LineageRecorder;
 import org.apache.falcon.metadata.MetadataMappingService;
@@ -463,73 +454,36 @@ public class LineageMetadataResourceTest {
     }
 
     public void addClusterEntity() throws Exception {
-        clusterEntity = buildCluster(CLUSTER_ENTITY_NAME, COLO_NAME, "classification=production");
+        clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME,
+                COLO_NAME, "classification=production");
         configStore.publish(EntityType.CLUSTER, clusterEntity);
     }
 
     public void addFeedEntity() throws Exception {
-        Feed impressionsFeed = buildFeed("impression-feed", clusterEntity, "classified-as=Secure", "analytics",
-                        Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
+        Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity,
+                "classified-as=Secure", "analytics");
+        addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, impressionsFeed);
         inputFeeds.add(impressionsFeed);
 
-        Feed clicksFeed = buildFeed("clicks-feed", clusterEntity, null, null,
-                Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
+        Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity, null, null);
+        addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, clicksFeed);
         inputFeeds.add(clicksFeed);
 
-        Feed join1Feed = buildFeed("imp-click-join1", clusterEntity, "classified-as=Financial",
-                        "reporting,bi",
-                        Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi");
+        addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, join1Feed);
         outputFeeds.add(join1Feed);
 
-        Feed join2Feed = buildFeed("imp-click-join2", clusterEntity,
-                "classified-as=Secure,classified-as=Financial",
-                "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+        Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi");
+        addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
         configStore.publish(EntityType.FEED, join2Feed);
         outputFeeds.add(join2Feed);
     }
 
-    public static Cluster buildCluster(String name, String colo, String tags) {
-        Cluster cluster = new Cluster();
-        cluster.setName(name);
-        cluster.setColo(colo);
-        cluster.setTags(tags);
-
-        Interfaces interfaces = new Interfaces();
-        cluster.setInterfaces(interfaces);
-
-        Interface storage = new Interface();
-        storage.setEndpoint("jail://global:00");
-        storage.setType(Interfacetype.WRITE);
-        cluster.getInterfaces().getInterfaces().add(storage);
-
-        return cluster;
-    }
-
-    public static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups,
-                                 Storage.TYPE storageType, String uriTemplate) {
-        Feed feed = new Feed();
-        feed.setName(feedName);
-        feed.setTags(tags);
-        feed.setGroups(groups);
-        feed.setFrequency(Frequency.fromString("hours(1)"));
-
-        org.apache.falcon.entity.v0.feed.Clusters
-                clusters = new org.apache.falcon.entity.v0.feed.Clusters();
-        feed.setClusters(clusters);
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
-                new org.apache.falcon.entity.v0.feed.Cluster();
-        feedCluster.setName(cluster.getName());
-        clusters.getClusters().add(feedCluster);
-
-        addStorage(feed, storageType, uriTemplate);
-
-        return feed;
-    }
-
     public static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
         if (storageType == Storage.TYPE.FILESYSTEM) {
             Locations locations = new Locations();
@@ -546,63 +500,17 @@ public class LineageMetadataResourceTest {
         }
     }
 
-    public static Process buildProcess(String processName, Cluster cluster,
-                                       String tags) throws Exception {
-        Process processEntity = new Process();
-        processEntity.setName(processName);
-        processEntity.setTags(tags);
-
-        org.apache.falcon.entity.v0.process.Cluster processCluster =
-                new org.apache.falcon.entity.v0.process.Cluster();
-        processCluster.setName(cluster.getName());
-        processEntity.setClusters(new Clusters());
-        processEntity.getClusters().getClusters().add(processCluster);
-
-        return processEntity;
-    }
-
-    public static void addWorkflow(Process process, String workflowName, String version) {
-        Workflow workflow = new Workflow();
-        workflow.setName(workflowName);
-        workflow.setVersion(version);
-        workflow.setEngine(EngineType.PIG);
-        workflow.setPath("/falcon/test/workflow");
-
-        process.setWorkflow(workflow);
-    }
-
-    public static void addInput(Process process, Feed feed) {
-        if (process.getInputs() == null) {
-            process.setInputs(new Inputs());
-        }
-
-        Inputs inputs = process.getInputs();
-        Input input = new Input();
-        input.setFeed(feed.getName());
-        inputs.getInputs().add(input);
-    }
-
-    public static void addOutput(Process process, Feed feed) {
-        if (process.getOutputs() == null) {
-            process.setOutputs(new Outputs());
-        }
-
-        Outputs outputs = process.getOutputs();
-        Output output = new Output();
-        output.setFeed(feed.getName());
-        outputs.getOutputs().add(output);
-    }
-
     public void addProcessEntity() throws Exception {
-        Process processEntity = buildProcess(PROCESS_ENTITY_NAME, clusterEntity, "classified-as=Critical");
-        addWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+        Process processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME,
+                clusterEntity, "classified-as=Critical");
+        EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
 
         for (Feed inputFeed : inputFeeds) {
-            addInput(processEntity, inputFeed);
+            EntityBuilderTestUtil.addInput(processEntity, inputFeed);
         }
 
         for (Feed outputFeed : outputFeeds) {
-            addOutput(processEntity, outputFeed);
+            EntityBuilderTestUtil.addOutput(processEntity, outputFeed);
         }
 
         configStore.publish(EntityType.PROCESS, processEntity);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/adca0057/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java b/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
new file mode 100644
index 0000000..787e528
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.security;
+
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Test for FalconAuthenticationFilter using mock objects.
+ */
+public class FalconAuthenticationFilterTest {
+
+    @Mock
+    private HttpServletRequest mockRequest;
+
+    @Mock
+    private HttpServletResponse mockResponse;
+
+    @Mock
+    private FilterChain mockChain;
+
+    @Mock
+    private FilterConfig mockConfig;
+
+    @Mock
+    private UserGroupInformation mockUgi;
+
+    @BeforeClass
+    public void init() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @BeforeMethod
+    private void initAuthType() {
+        ConcurrentHashMap<String, String> conf = new ConcurrentHashMap<String, String>();
+        conf.put("type", "simple");
+        conf.put("config.prefix.type", "");
+        conf.put("anonymous.allowed", "true");
+        Mockito.when(mockConfig.getInitParameterNames()).thenReturn(conf.keys());
+
+        for (Map.Entry<String, String> entry : conf.entrySet()) {
+            Mockito.when(mockConfig.getInitParameter(entry.getKey())).thenReturn(entry.getValue());
+        }
+
+        Mockito.when(mockRequest.getMethod()).thenReturn("OPTIONS");
+
+        StringBuffer requestUrl = new StringBuffer("http://localhost");
+        Mockito.when(mockRequest.getRequestURL()).thenReturn(requestUrl);
+    }
+
+    @Test
+    public void testDoFilter() throws Exception {
+        Filter filter = new FalconAuthenticationFilter();
+        synchronized (StartupProperties.get()) {
+            filter.init(mockConfig);
+        }
+
+        CurrentUser.authenticate("nouser");
+        Assert.assertEquals(CurrentUser.getUser(), "nouser");
+
+        CurrentUser.authenticate("guest");
+        Mockito.when(mockRequest.getQueryString()).thenReturn("user.name=guest");
+        filter.doFilter(mockRequest, mockResponse, mockChain);
+        Assert.assertEquals(CurrentUser.getUser(), "guest");
+
+        CurrentUser.authenticate("nouser");
+        Assert.assertEquals(CurrentUser.getUser(), "nouser");
+        CurrentUser.authenticate("testuser");
+        Mockito.when(mockRequest.getRemoteUser()).thenReturn("testuser");
+        filter.doFilter(mockRequest, mockResponse, mockChain);
+        Assert.assertEquals(CurrentUser.getUser(), "testuser");
+    }
+
+    @Test
+    public void testAnonymous() throws Exception {
+        Filter filter = new FalconAuthenticationFilter();
+
+        synchronized (StartupProperties.get()) {
+            filter.init(mockConfig);
+        }
+
+        CurrentUser.authenticate("nouser");
+        Assert.assertEquals(CurrentUser.getUser(), "nouser");
+
+        CurrentUser.authenticate("testuser");
+        Mockito.when(mockRequest.getRemoteUser()).thenReturn("testuser");
+        filter.doFilter(mockRequest, mockResponse, mockChain);
+        Assert.assertEquals(CurrentUser.getUser(), "testuser");
+    }
+
+    @Test
+    public void testEmptyUser() throws Exception {
+        Filter filter = new FalconAuthenticationFilter();
+
+        synchronized (StartupProperties.get()) {
+            filter.init(mockConfig);
+        }
+
+        final String userName = System.getProperty("user.name");
+        try {
+            System.setProperty("user.name", "");
+
+            Mockito.when(mockRequest.getMethod()).thenReturn("POST");
+            Mockito.when(mockRequest.getQueryString()).thenReturn("");
+            Mockito.when(mockRequest.getRemoteUser()).thenReturn(null);
+
+            HttpServletResponse errorResponse = Mockito.mock(HttpServletResponse.class);
+            filter.doFilter(mockRequest, errorResponse, mockChain);
+        } finally {
+            System.setProperty("user.name", userName);
+        }
+    }
+
+    @Test
+    public void testDoFilterForClientBackwardsCompatibility() throws Exception {
+        Filter filter = new FalconAuthenticationFilter();
+
+        final String userName = System.getProperty("user.name");
+        final String httpAuthType =
+                StartupProperties.get().getProperty("falcon.http.authentication.type", "simple");
+        try {
+            System.setProperty("user.name", "");
+            StartupProperties.get().setProperty("falcon.http.authentication.type",
+                    "org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler");
+
+            synchronized (StartupProperties.get()) {
+                filter.init(mockConfig);
+            }
+
+            Mockito.when(mockRequest.getMethod()).thenReturn("POST");
+            Mockito.when(mockRequest.getQueryString()).thenReturn("");
+            Mockito.when(mockRequest.getRemoteUser()).thenReturn(null);
+            Mockito.when(mockRequest.getHeader("Remote-User")).thenReturn("remote-user");
+
+            filter.doFilter(mockRequest, mockResponse, mockChain);
+
+            Assert.assertEquals(CurrentUser.getUser(), "remote-user");
+
+        } finally {
+            System.setProperty("user.name", userName);
+            StartupProperties.get().setProperty("falcon.http.authentication.type", httpAuthType);
+        }
+    }
+
+    @Test
+    public void testGetKerberosPrincipalWithSubstitutedHostSecure() throws Exception {
+        String principal = StartupProperties.get().getProperty(FalconAuthenticationFilter.KERBEROS_PRINCIPAL);
+
+        String expectedPrincipal = "falcon/" + SecurityUtil.getLocalHostName() + "@Example.com";
+        try {
+            Configuration conf = new Configuration(false);
+            conf.set("hadoop.security.authentication", "kerberos");
+            UserGroupInformation.setConfiguration(conf);
+            Assert.assertTrue(UserGroupInformation.isSecurityEnabled());
+
+            StartupProperties.get().setProperty(
+                    FalconAuthenticationFilter.KERBEROS_PRINCIPAL, "falcon/_HOST@Example.com");
+            FalconAuthenticationFilter filter = new FalconAuthenticationFilter();
+            Properties properties = filter.getConfiguration(FalconAuthenticationFilter.FALCON_PREFIX, null);
+            Assert.assertEquals(
+                    properties.get(KerberosAuthenticationHandler.PRINCIPAL), expectedPrincipal);
+        } finally {
+            StartupProperties.get().setProperty(FalconAuthenticationFilter.KERBEROS_PRINCIPAL, principal);
+        }
+    }
+
+    @Test
+    public void testGetKerberosPrincipalWithSubstitutedHostNonSecure() throws Exception {
+        String principal = StartupProperties.get().getProperty(FalconAuthenticationFilter.KERBEROS_PRINCIPAL);
+        Configuration conf = new Configuration(false);
+        conf.set("hadoop.security.authentication", "simple");
+        UserGroupInformation.setConfiguration(conf);
+        Assert.assertFalse(UserGroupInformation.isSecurityEnabled());
+
+        FalconAuthenticationFilter filter = new FalconAuthenticationFilter();
+        Properties properties = filter.getConfiguration(FalconAuthenticationFilter.FALCON_PREFIX, null);
+        Assert.assertEquals(properties.get(KerberosAuthenticationHandler.PRINCIPAL), principal);
+    }
+}


Mime
View raw message