falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2194 Enhaced validate API to support config validation for user extensions
Date Mon, 12 Dec 2016 04:21:18 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 3e7e08f77 -> 2f1aa291f


FALCON-2194 Enhaced validate API to support config validation for user extensions

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi-rao

Closes #314 from sandeepSamudrala/FALCON-2194 and squashes the following commits:

0313041 [sandeep] FALCON-2194 Incorporated Review comments
61a463c [sandeep] FALCON-2194  Enhancing validate API to support config validation for user
extensions
8d58ef6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2194
53c680b [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2194
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time
with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2f1aa291
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2f1aa291
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2f1aa291

Branch: refs/heads/master
Commit: 2f1aa291ffd1e62c8186638e62115c0df0c393a5
Parents: 3e7e08f
Author: sandeep <sandysmdl@gmail.com>
Authored: Mon Dec 12 09:51:22 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Mon Dec 12 09:51:22 2016 +0530

----------------------------------------------------------------------
 .../apache/falcon/cli/FalconExtensionCLI.java   |  2 +-
 .../org/apache/falcon/ExtensionHandler.java     | 29 +++++++
 .../falcon/client/AbstractFalconClient.java     |  4 +-
 .../org/apache/falcon/client/FalconClient.java  | 79 +++++++++++---------
 .../resource/extensions/ExtensionManager.java   |  4 +
 .../apache/falcon/unit/FalconUnitClient.java    |  2 +-
 6 files changed, 79 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 984b6a3..57871c3 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -113,7 +113,7 @@ public class FalconExtensionCLI {
         } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
             validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
-            result = client.validateExtensionJob(extensionName, filePath, doAsUser).getMessage();
+            result = client.validateExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
             validateRequiredParameter(jobName, JOB_NAME_OPT);
             result = client.scheduleExtensionJob(jobName, doAsUser).getMessage();

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/client/src/main/java/org/apache/falcon/ExtensionHandler.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
index c18a7cc..8168b23 100644
--- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java
+++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
@@ -27,6 +27,8 @@ import org.apache.falcon.extensions.ExtensionBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +44,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.ServiceLoader;
 
+import static org.apache.falcon.client.FalconClient.OUT;
+
 /**
  * Handler class that is responsible for preparing Extension entities.
  */
@@ -50,6 +54,8 @@ public final class ExtensionHandler {
     public static final Logger LOG = LoggerFactory.getLogger(ExtensionHandler.class);
     private static final String UTF_8 = CharEncoding.UTF_8;
     private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
+    private static final String LOCATION = "location";
+    private static final String TYPE = "type";
 
     public List<Entity> getEntities(ClassLoader extensionClassloader, String extensionName,
String jobName,
                                            InputStream configStream) throws IOException,
FalconException {
@@ -186,4 +192,27 @@ public final class ExtensionHandler {
         urls.add(fileURL);
         return urls;
     }
+
+
+    public static String getExtensionLocation(String extensionName, JSONObject extensionDetailJson)
{
+        String extensionBuildPath;
+        try {
+            extensionBuildPath = extensionDetailJson.get(LOCATION).toString();
+        } catch (JSONException e) {
+            OUT.get().print("Error. " + extensionName + " not found ");
+            throw new FalconCLIException("Failed to get extension type for the given extension");
+        }
+        return extensionBuildPath;
+    }
+
+    public static  String getExtensionType(String extensionName, JSONObject extensionDetailJson)
{
+        String extensionType;
+        try {
+            extensionType = extensionDetailJson.get(TYPE).toString();
+        } catch (JSONException e) {
+            OUT.get().print("Error. " + extensionName + " not found ");
+            throw new FalconCLIException("Failed to get extension type for the given extension");
+        }
+        return extensionType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index e9a10fd..9659cfc 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -197,7 +197,7 @@ public abstract class AbstractFalconClient {
     public abstract String unregisterExtension(String extensionName);
 
     /**
-     * Prepare set of entities the extension has implemented and stage them to a local directory
and submit them too.
+     * Prepares set of entities the extension has implemented and stage them to a local directory
and submit them too.
      * @param extensionName extension which is available in the store.
      * @param jobName name to be used in all the extension entities' tagging that are built
as part of
      *                           loadAndPrepare.
@@ -209,7 +209,7 @@ public abstract class AbstractFalconClient {
                                                 String doAsUser);
 
     /**
-     * Prepare set of entities the extension has implemented and stage them to a local directory
and submits and
+     * Prepares set of entities the extension has implemented and stage them to a local directory
and submits and
      * schedules them.
      * @param extensionName extension which is available in the store.
      * @param jobName name to be used in all the extension entities' tagging that are built
as part of

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 9820686..7fa5330 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -23,8 +23,6 @@ import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.client.urlconnection.HTTPSProperties;
-import com.sun.jersey.core.header.FormDataContentDisposition;
-import com.sun.jersey.multipart.FormDataBodyPart;
 import com.sun.jersey.multipart.FormDataMultiPart;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -141,7 +139,9 @@ public class FalconClient extends AbstractFalconClient {
             return true;
         }
     };
-    private static final String TAG_SEPARATOR = ",";
+    private static final String FEEDS = "feeds";
+    private static final String PROCESSES = "processes";
+    private static final String CONFIG = "config";
     private final WebResource service;
     private final AuthenticatedURL.Token authenticationToken;
 
@@ -1073,16 +1073,16 @@ public class FalconClient extends AbstractFalconClient {
         List<Entity> entities = validateExtensionAndGetEntities(extensionName, jobName,
configStream);
         FormDataMultiPart formDataMultiPart = new FormDataMultiPart();
 
-        for (Entity entity : entities) {
-            if (EntityType.FEED.equals(entity.getEntityType())) {
-                formDataMultiPart.field("feeds", entity, MediaType.APPLICATION_XML_TYPE);
-            } else if (EntityType.PROCESS.equals(entity.getEntityType())) {
-                formDataMultiPart.field("processes", entity, MediaType.APPLICATION_XML_TYPE);
+        if (entities != null && !entities.isEmpty()) {
+            for (Entity entity : entities) {
+                if (EntityType.FEED.equals(entity.getEntityType())) {
+                    formDataMultiPart.field(FEEDS, entity, MediaType.APPLICATION_XML_TYPE);
+                } else if (EntityType.PROCESS.equals(entity.getEntityType())) {
+                    formDataMultiPart.field(PROCESSES, entity, MediaType.APPLICATION_XML_TYPE);
+                }
             }
         }
-
-        formDataMultiPart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("config").build(),
configStream,
-                MediaType.APPLICATION_OCTET_STREAM_TYPE));
+        formDataMultiPart.field(CONFIG, configStream, MediaType.APPLICATION_OCTET_STREAM_TYPE);
         try {
             formDataMultiPart.close();
         } catch (IOException e) {
@@ -1094,30 +1094,28 @@ public class FalconClient extends AbstractFalconClient {
 
     private List<Entity> validateExtensionAndGetEntities(String extensionName, String
jobName,
                                                          InputStream configStream) {
-        ClientResponse clientResponse = getExtensionDetailResponse(extensionName);
-        List<Entity> entities = getEntities(extensionName, jobName, configStream, clientResponse);
+        JSONObject extensionDetailJson = getExtensionDetailJson(extensionName);
+        String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson);
+        String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName,
extensionDetailJson);
+        List<Entity> entities = getEntities(extensionName, jobName, configStream, extensionType,
+                extensionBuildLocation);
         return entities;
     }
 
-    private List<Entity> getEntities(String extensionName, String jobName, InputStream
configStream,
-                                           ClientResponse clientResponse) {
-        JSONObject responseJson;
-        try {
-            responseJson = new JSONObject(clientResponse.getEntity(String.class));
-        } catch (JSONException e) {
-            OUT.get().print("Submit failed. Failed to get details for the given extension");
-            throw new FalconCLIException("Submit failed. Failed to get details for the given
extension");
-        }
-        String extensionType;
-        String extensionBuildLocation;
+    private JSONObject getExtensionDetailJson(String extensionName) {
+        ClientResponse clientResponse = getExtensionDetailResponse(extensionName);
+        JSONObject extensionDetailJson;
         try {
-            extensionType = responseJson.get("type").toString();
-            extensionBuildLocation = responseJson.get("location").toString();
+            extensionDetailJson = new JSONObject(clientResponse.getEntity(String.class));
         } catch (JSONException e) {
-            OUT.get().print("Error. " + extensionName + " not found ");
-            throw new FalconCLIException("Submit failed. Failed to get details for the given
extension");
+            OUT.get().print("Failed to get details for the given extension");
+            throw new FalconCLIException("Failed to get details for the given extension");
         }
+        return extensionDetailJson;
+    }
 
+    private List<Entity> getEntities(String extensionName, String jobName, InputStream
configStream,
+                                     String extensionType, String extensionBuildLocation)
{
         List<Entity> entities = null;
         if (!extensionType.equals(ExtensionType.CUSTOM.name())) {
             try {
@@ -1125,11 +1123,11 @@ public class FalconClient extends AbstractFalconClient {
                         extensionBuildLocation);
             } catch (Exception e) {
                 OUT.get().println("Error in building the extension");
-                throw new FalconCLIException("Submit failed. Failed to get details for the
given extension");
+                throw new FalconCLIException("Failed to prepare entities for the given extension");
             }
             if (entities == null || entities.isEmpty()) {
                 OUT.get().println("No entities got built");
-                throw new FalconCLIException("Submit failed. Failed to get details for the
given extension");
+                throw new FalconCLIException("Failed to prepare entities for the given extension");
             }
         }
         return entities;
@@ -1154,13 +1152,20 @@ public class FalconClient extends AbstractFalconClient {
         return getResponse(APIResult.class, clientResponse);
     }
 
-    public APIResult validateExtensionJob(final String extensionName, final String filePath,
final String doAsUser) {
-        InputStream entityStream = getServletInputStream(filePath);
-        ClientResponse clientResponse = new ResourceBuilder()
-                .path(ExtensionOperations.VALIDATE.path, extensionName)
-                .addQueryParam(DO_AS_OPT, doAsUser)
-                .call(ExtensionOperations.VALIDATE, entityStream);
-        return getResponse(APIResult.class, clientResponse);
+    public APIResult validateExtensionJob(final String extensionName, final String jobName,
+                                          final String configPath, final String doAsUser)
{
+        String extensionType = ExtensionHandler.getExtensionType(extensionName, getExtensionDetailJson(extensionName));
+        InputStream configStream = getServletInputStream(configPath);
+        if (ExtensionType.TRUSTED.name().equalsIgnoreCase(extensionType)) {
+            ClientResponse clientResponse = new ResourceBuilder()
+                    .path(ExtensionOperations.VALIDATE.path, extensionName)
+                    .addQueryParam(DO_AS_OPT, doAsUser)
+                    .call(ExtensionOperations.VALIDATE, configStream);
+            return getResponse(APIResult.class, clientResponse);
+        } else {
+            validateExtensionAndGetEntities(extensionName, jobName, configStream);
+            return new APIResult(APIResult.Status.SUCCEEDED, "Validated successfully");
+        }
     }
 
     public APIResult scheduleExtensionJob(final String jobName, final String doAsUser)  {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
index 241af31..47ee737 100644
--- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
@@ -487,6 +487,10 @@ public class ExtensionManager extends AbstractSchedulableEntityManager
{
             @Context HttpServletRequest request,
             @DefaultValue("") @QueryParam("doAs") String doAsUser) {
         checkIfExtensionServiceIsEnabled();
+        ExtensionType extensionType = getExtensionType(extensionName);
+        if (!ExtensionType.TRUSTED.equals(extensionType)) {
+            throw FalconWebException.newAPIException("Extension validation is supported only
for trusted extensions");
+        }
         try {
             List<Entity> entities = generateEntities(extensionName, request.getInputStream());
             for (Entity entity : entities) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2f1aa291/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 0eb0ab3..4da9f73 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -316,7 +316,7 @@ public class FalconUnitClient extends AbstractFalconClient {
             entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream,
                     packagePath);
         } catch (FalconException | IOException e) {
-            throw new FalconCLIException("Failed in generating entties" + jobName);
+            throw new FalconCLIException("Failed in generating entities" + jobName);
         }
         return entities;
     }


Mime
View raw message