falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2190 server side checks to not let any entity operations on entities generated by extensions
Date Thu, 15 Dec 2016 04:40:28 GMT
Repository: falcon
Updated Branches:
  refs/heads/master ffda98cc1 -> 617d5ab94


FALCON-2190 server side checks to not let any entity operations on entities generated by extensions

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi-rao

Closes #316 from sandeepSamudrala/FALCON-2190 and squashes the following commits:

57170c4 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190
acf2734 [sandeep] FALCON-2190 Incorporated review comments
d343ea3 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190
07c7dd7 [sandeep] FALCON-2190 Incorporated review comments
ca6fbb1 [sandeep] FALCON-2190 Incorporated review comments
94c93c4 [sandeep] FALCON-2190 Incorporated review comments
7873b05 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190
e7c438a [sandeep] FALCON-2190 Fixed tags issue with extension being null for usual entities
17a15c8 [sandeep] FALCON-2190 Fixed check style issues
4c28d1c [sandeep] FALCON-2190 server side checks to not let any entity operations on entities
generated by extensions
f701317 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190
fd2357b [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190
8aacd75 [sandeep] FALCON-2183 Incorporated review comments
f3d7268 [sandeep] FALCON-2183 Incorporated review comments
11e7b3f [sandeep] FALCON-2183 Extension Builder changes to support new user extensions
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time
with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/617d5ab9
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/617d5ab9
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/617d5ab9

Branch: refs/heads/master
Commit: 617d5ab9492cb8c8a07fda0411a9a7826cff091c
Parents: ffda98c
Author: sandeep <sandysmdl@gmail.com>
Authored: Thu Dec 15 10:10:03 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Thu Dec 15 10:10:03 2016 +0530

----------------------------------------------------------------------
 .../falcon/client/AbstractFalconClient.java     |  2 +-
 .../apache/falcon/entity/CatalogStorage.java    |  2 +-
 .../extensions/jdbc/ExtensionMetaStore.java     | 17 +++++
 .../resource/extensions/ExtensionManager.java   |  4 +-
 .../proxy/SchedulableEntityManagerProxy.java    | 68 ++++++++++++++++----
 5 files changed, 78 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/617d5ab9/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index fc6bc14..3dabf52 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -521,7 +521,7 @@ public abstract class AbstractFalconClient {
         try {
             stream = new FileInputStream(filePath);
         } catch (FileNotFoundException e) {
-            throw new FalconCLIException("File not found:", e);
+            throw new FalconCLIException("File not found:" + filePath, e);
         }
         return stream;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/617d5ab9/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 31feee8..4633796 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -57,7 +57,7 @@ import java.util.regex.Matcher;
  */
 public class CatalogStorage extends Configured implements Storage {
 
-    private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CatalogStorage.class);
 
     // constants to be used while preparing HCatalog partition filter query
     private static final String FILTER_ST_BRACKET = "(";

http://git-wip-us.apache.org/repos/asf/falcon/blob/617d5ab9/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index 882582f..4250e15 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -75,6 +75,23 @@ public class ExtensionMetaStore {
         return false;
     }
 
+    public Boolean checkIfExtensionJobExists(String jobName) {
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION_JOB);
+        q.setParameter(JOB_NAME, jobName);
+        int resultSize = 0;
+        try {
+            resultSize = q.getResultList().size();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+        if (resultSize > 0){
+            return true;
+        }
+        return false;
+    }
+
     public List<ExtensionBean> getAllExtensions() {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);

http://git-wip-us.apache.org/repos/asf/falcon/blob/617d5ab9/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
index 9a7daa5..7c30c83 100644
--- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
@@ -331,7 +331,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager
{
             LOG.error("Error while submitting extension job: ", e);
             throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
         }
-        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully"
+ jobName);
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully:"
+ jobName);
     }
 
     private Map<EntityType, List<Entity>> getEntityList(String extensionName,
String jobName,
@@ -725,7 +725,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager
{
         return groupedEntities;
     }
 
-    private String getJobNameFromTag(String tags) {
+    public static String getJobNameFromTag(String tags) {
         int nameStart = tags.indexOf(TAG_PREFIX_EXTENSION_JOB);
         if (nameStart == -1) {
             return null;

http://git-wip-us.apache.org/repos/asf/falcon/blob/617d5ab9/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 07334d6..316567e 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -27,6 +27,8 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
+import org.apache.falcon.extensions.store.ExtensionStore;
 import org.apache.falcon.monitors.Dimension;
 import org.apache.falcon.monitors.Monitored;
 import org.apache.falcon.resource.APIResult;
@@ -37,6 +39,7 @@ import org.apache.falcon.resource.FeedLookupResult;
 import org.apache.falcon.resource.SchedulableEntityInstanceResult;
 import org.apache.falcon.resource.channel.Channel;
 import org.apache.falcon.resource.channel.ChannelFactory;
+import org.apache.falcon.resource.extensions.ExtensionManager;
 import org.apache.falcon.util.DeploymentUtil;
 
 import javax.servlet.http.HttpServletRequest;
@@ -163,6 +166,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
         Map<String, APIResult> results = new HashMap<String, APIResult>();
         final Set<String> colos = getApplicableColos(type, entity);
 
+        entityHasExtensionJobTag(entity);
         validateEntity(entity, colos);
 
         results.put(FALCON_TAG, new EntityProxy(type, entity.getName()) {
@@ -246,7 +250,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
      * Delete the specified entity.
      * @param request Servlet Request
      * @param type Valid options are cluster, feed or process.
-     * @param entity Name of the entity.
+     * @param entityName Name of the entity.
      * @param ignore colo is ignored
      * @return Results of the delete operation.
      */
@@ -257,21 +261,26 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     @Override
     public APIResult delete(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type")
final String type,
-            @Dimension("entityName") @PathParam("entity") final String entity,
+            @Dimension("entityName") @PathParam("entity") final String entityName,
             @Dimension("colo") @QueryParam("colo") String ignore) {
 
+        try {
+            isEntityPartOfAnExtension(EntityUtil.getEntity(type, entityName));
+        } catch (FalconException e) {
+            throw FalconWebException.newAPIException(e);
+        }
         final HttpServletRequest bufferedRequest = new BufferedRequest(request);
         Map<String, APIResult> results = new HashMap<String, APIResult>();
 
-        results.put(FALCON_TAG, new EntityProxy(type, entity) {
+        results.put(FALCON_TAG, new EntityProxy(type, entityName) {
             @Override
             public APIResult execute() {
                 try {
-                    EntityUtil.getEntity(type, entity);
+                    EntityUtil.getEntity(type, entityName);
                     return super.execute();
                 } catch (EntityNotRegisteredException e) {
                     return new APIResult(APIResult.Status.SUCCEEDED,
-                            entity + "(" + type + ") doesn't exist. Nothing to do");
+                            entityName + "(" + type + ") doesn't exist. Nothing to do");
                 } catch (FalconException e) {
                     throw FalconWebException.newAPIException(e);
                 }
@@ -279,13 +288,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type,
entity, colo);
+                return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type,
entityName, colo);
             }
         }.execute());
 
         // delete only if deleted from everywhere
         if (!embeddedMode && results.get(FALCON_TAG).getStatus() == APIResult.Status.SUCCEEDED)
{
-            results.put(PRISM_TAG, super.delete(bufferedRequest, type, entity, currentColo));
+            results.put(PRISM_TAG, super.delete(bufferedRequest, type, entityName, currentColo));
         }
         return consolidateResult(results, APIResult.class);
     }
@@ -310,9 +319,16 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
             @Dimension("colo") @QueryParam("colo") String ignore,
             @QueryParam("skipDryRun") final Boolean skipDryRun) {
 
+        try {
+            isEntityPartOfAnExtension(EntityUtil.getEntity(type, entityName));
+        } catch (FalconException e) {
+            throw FalconWebException.newAPIException(e);
+        }
         final HttpServletRequest bufferedRequest = new BufferedRequest(request);
+        Entity newEntity = getEntity(bufferedRequest, type);
+        entityHasExtensionJobTag(newEntity);
         final Set<String> oldColos = getApplicableColos(type, entityName);
-        final Set<String> newColos = getApplicableColos(type, getEntity(bufferedRequest,
type));
+        final Set<String> newColos = getApplicableColos(type, newEntity);
         final Set<String> mergedColos = new HashSet<String>();
         mergedColos.addAll(oldColos);
         mergedColos.retainAll(newColos);    //Common colos where update should be called
@@ -378,6 +394,34 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
         return consolidateResult(results, APIResult.class);
     }
 
+    private void isEntityPartOfAnExtension(Entity entity) {
+        String tags = entity.getTags();
+        checkExtensionJobExist(tags);
+    }
+
+
+    private void entityHasExtensionJobTag(Entity entity) {
+        String tags = entity.getTags();
+        if (StringUtils.isNotBlank(tags)) {
+            String jobName = ExtensionManager.getJobNameFromTag(tags);
+            if (StringUtils.isNotBlank(jobName)) {
+                throw FalconWebException.newAPIException("Entity has extension job name in
the tag. Such entities need "
+                        + "to be submitted as extension jobs:" + jobName);
+            }
+        }
+    }
+
+    private void checkExtensionJobExist(String tags) {
+        if (tags != null) {
+            String jobName = ExtensionManager.getJobNameFromTag(tags);
+            ExtensionMetaStore extensionMetaStore = ExtensionStore.getMetaStore();
+            if (jobName != null && extensionMetaStore.checkIfExtensionJobExists(jobName))
{
+                throw FalconWebException.newAPIException("Entity operation is not allowed
on this entity as it is"
+                        + "part of an extension job:" + jobName);
+            }
+        }
+    }
+
     /**
      * Updates the dependent entities of a cluster in workflow engine.
      * @param clusterName Name of cluster.
@@ -575,16 +619,18 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
             @QueryParam("skipDryRun") Boolean skipDryRun,
             @QueryParam("properties") String properties) {
         BufferedRequest bufferedRequest = new BufferedRequest(request);
-        String entity = getEntity(bufferedRequest, type).getName();
+        final Entity entity = getEntity(bufferedRequest, type);
+        String entityName = entity.getName();
+        entityHasExtensionJobTag(entity);
         Map<String, APIResult> results = new HashMap<String, APIResult>();
         results.put("submit", submit(bufferedRequest, type, coloExpr));
-        results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr, skipDryRun,
properties));
+        results.put("schedule", schedule(bufferedRequest, type, entityName, coloExpr, skipDryRun,
properties));
         return consolidateResult(results, APIResult.class);
     }
 
     /**
      * Suspend an entity.
-     * @param request Servlet Request
+     * @param request Servlet Requests
      * @param type Valid options are feed or process.
      * @param entity Name of the entity.
      * @param coloExpr Colo on which the query should be run.


Mime
View raw message