falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2185 Falcon Client changes for Falcon user extensions.
Date Fri, 25 Nov 2016 10:25:30 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 1b7708fa1 -> 3f5087997


FALCON-2185 Falcon Client changes for Falcon user extensions.

Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi-rao

Closes #306 from sandeepSamudrala/FALCON-2185 and squashes the following commits:

466705f [sandeep] FALCON-2185 Incorporated review comments.Made stage entities private method
2a3e61d [sandeep] FALCON-2185 Incorporated more review comments
e3516c2 [sandeep] FALCON-2185 Incorporated review comments
cfe6c57 [sandeep] FALCON-2185 Moved UTs to falcon unit and example to extensions
ebac5bb [sandeep] FALCON-2185 Falcon Client changes for Falcon user extensions
d680244 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2185
8b2e0d9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2185
2fd05bb [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2185
fc7e9a1 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2185
8aacd75 [sandeep] FALCON-2183 Incorporated review comments
f3d7268 [sandeep] FALCON-2183 Incorporated review comments
11e7b3f [sandeep] FALCON-2183 Extension Builder changes to support new user extensions
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3f508799
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3f508799
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3f508799

Branch: refs/heads/master
Commit: 3f50879971453f4ced61fa24c4f4a425cbf2631e
Parents: 1b7708f
Author: sandeep <sandysmdl@gmail.com>
Authored: Fri Nov 25 15:54:56 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Fri Nov 25 15:54:56 2016 +0530

----------------------------------------------------------------------
 .../apache/falcon/cli/FalconExtensionCLI.java   |  32 ++--
 client/pom.xml                                  |   4 +
 .../org/apache/falcon/ExtensionClassLoader.java |  61 ++++++
 .../org/apache/falcon/ExtensionHandler.java     | 189 +++++++++++++++++++
 .../falcon/client/AbstractFalconClient.java     |  12 ++
 .../org/apache/falcon/client/FalconClient.java  |  50 ++++-
 .../falcon/client/FalconExtensionConstants.java |  34 ++++
 .../org/apache/falcon/entity/EntityUtil.java    |  46 +++++
 .../site/twiki/falconcli/ExtensionSubmit.twiki  |   2 +-
 .../falconcli/ExtensionSubmitAndSchedule.twiki  |   2 +-
 .../org/apache/falcon/ExtensionExample.java     |  66 +++++++
 ...rg.apache.falcon.extensions.ExtensionBuilder |  18 ++
 extensions/src/test/resources/process.xml       |  59 ++++++
 .../resource/extensions/ExtensionManager.java   |  40 +---
 unit/pom.xml                                    |  23 +++
 .../apache/falcon/unit/FalconUnitClient.java    |   6 +
 .../apache/falcon/ExtensionClassLoaderTest.java |  55 ++++++
 .../org/apache/falcon/ExtensionUtilTest.java    |  66 +++++++
 unit/src/test/resources/extension.properties    |  23 +++
 19 files changed, 728 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 83b550f..15b1b32 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -54,7 +54,7 @@ public class FalconExtensionCLI {
     public static final String REGISTER_OPT = "register";
 
     // Input parameters
-    public static final String ENTENSION_NAME_OPT = "extensionName";
+    public static final String EXTENSION_NAME_OPT = "extensionName";
     public static final String JOB_NAME_OPT = "jobName";
     public static final String DESCRIPTION = "description";
     public static final String PATH = "path";
@@ -69,7 +69,7 @@ public class FalconExtensionCLI {
         }
 
         String result;
-        String extensionName = commandLine.getOptionValue(ENTENSION_NAME_OPT);
+        String extensionName = commandLine.getOptionValue(EXTENSION_NAME_OPT);
         String jobName = commandLine.getOptionValue(JOB_NAME_OPT);
         String filePath = commandLine.getOptionValue(FalconCLIConstants.FILE_PATH_OPT);
         String doAsUser = commandLine.getOptionValue(FalconCLIConstants.DO_AS_OPT);
@@ -80,40 +80,36 @@ public class FalconExtensionCLI {
             result = client.enumerateExtensions();
             result = prettyPrintJson(result);
         } else if (optionsList.contains(DEFINITION_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             result = client.getExtensionDefinition(extensionName);
             result = prettyPrintJson(result);
         } else if (optionsList.contains(DESCRIBE_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             result = client.getExtensionDescription(extensionName);
         } else if (optionsList.contains(UNREGISTER_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             result = client.unregisterExtension(extensionName);
         }else if (optionsList.contains(DETAIL_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             result = client.getExtensionDetail(extensionName);
         } else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
-            result = client.submitExtensionJob(extensionName, filePath, doAsUser).getMessage();
+            result = client.submitExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(REGISTER_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             validateRequiredParameter(path, PATH);
             result = client.registerExtension(extensionName, path, description);
-        }else if (optionsList.contains(FalconCLIConstants.SUBMIT_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
-            validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
-            result = client.submitExtensionJob(extensionName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
             result = client.submitAndScheduleExtensionJob(extensionName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
             result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
             result = client.validateExtensionJob(extensionName, filePath, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.SCHEDULE_OPT)) {
@@ -129,7 +125,7 @@ public class FalconExtensionCLI {
             validateRequiredParameter(jobName, JOB_NAME_OPT);
             result = client.deleteExtensionJob(jobName, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.LIST_OPT)) {
-            validateRequiredParameter(extensionName, ENTENSION_NAME_OPT);
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             ExtensionJobList jobs = client.listExtensionJob(extensionName, doAsUser,
                     commandLine.getOptionValue(FalconCLIConstants.SORT_ORDER_OPT),
                     commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT),
@@ -203,7 +199,7 @@ public class FalconExtensionCLI {
         Option doAs = new Option(FalconCLIConstants.DO_AS_OPT, true, "doAs user");
         Option debug = new Option(FalconCLIConstants.DEBUG_OPTION, false,
                 "Use debug mode to see debugging statements on stdout");
-        Option extensionName = new Option(ENTENSION_NAME_OPT, true, "Extension name");
+        Option extensionName = new Option(EXTENSION_NAME_OPT, true, "Extension name");
         Option jobName = new Option(JOB_NAME_OPT, true, "Extension job name");
         Option instanceStatus = new Option(FalconCLIConstants.INSTANCE_STATUS_OPT, true, "Instance status");
         Option sortOrder = new Option(FalconCLIConstants.SORT_ORDER_OPT, true, "asc or desc order for results");

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 9daa998..b8647f9 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -112,6 +112,10 @@
             <artifactId>hive-webhcat-java-client</artifactId>
             <version>${hive.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-extensions</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/src/main/java/org/apache/falcon/ExtensionClassLoader.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/ExtensionClassLoader.java b/client/src/main/java/org/apache/falcon/ExtensionClassLoader.java
new file mode 100644
index 0000000..b1e88c3
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/ExtensionClassLoader.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Helper class loader that fetches jars from local disk and loads into JVM.
+ */
+
+public class ExtensionClassLoader extends URLClassLoader{
+
+    public static final Logger LOG = LoggerFactory.getLogger(ExtensionClassLoader.class);
+
+    public ExtensionClassLoader(URL[] urls, ClassLoader parent) {
+        super(urls, parent);
+    }
+
+    public static ClassLoader load(final List<URL> urls) throws IOException {
+        final ClassLoader parentClassLoader = ExtensionClassLoader.class.getClassLoader();
+        ClassLoader extensionClassLoader = java.security.AccessController.doPrivileged(
+                new java.security.PrivilegedAction<ExtensionClassLoader>() {
+                    @Override
+                    public ExtensionClassLoader run() {
+                        return new ExtensionClassLoader(urls.toArray(new URL[urls.size()]), parentClassLoader);
+                    }
+                }
+        );
+        LOG.info("Created a new ExtensionClassLoader using classpath = {}", Arrays.toString(urls.toArray()));
+        return extensionClassLoader;
+    }
+
+    @Override
+    protected void addURL(URL url) {
+        super.addURL(url);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/src/main/java/org/apache/falcon/ExtensionHandler.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
new file mode 100644
index 0000000..80df791
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon;
+
+import org.apache.commons.codec.CharEncoding;
+import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.client.FalconExtensionConstants;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.extensions.ExtensionBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
+
+/**
+ * Handler class that is responsible for preparing Extension entities.
+ */
+public final class ExtensionHandler {
+
+    public static final Logger LOG = LoggerFactory.getLogger(ExtensionHandler.class);
+    private static final String UTF_8 = CharEncoding.UTF_8;
+    private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
+
+    public List<Entity> getEntities(ClassLoader extensionClassloader, String extensionName, String jobName,
+                                           InputStream configStream) throws IOException, FalconException {
+        Thread.currentThread().setContextClassLoader(extensionClassloader);
+
+        ServiceLoader<ExtensionBuilder> extensionBuilders = ServiceLoader.load(ExtensionBuilder.class);
+
+        List<Class<? extends ExtensionBuilder>> result = new ArrayList<>();
+
+        for (ExtensionBuilder extensionBuilder : extensionBuilders) {
+            result.add(extensionBuilder.getClass());
+        }
+
+        if (result.isEmpty()) {
+            throw new FalconException("Extension Implementation not found in the package of : " + extensionName);
+        } else if (result.size() > 1) {
+            throw new FalconException("Found more than one extension Implementation in the package of : "
+                    + extensionName);
+        }
+
+        ExtensionBuilder extensionBuilder = null;
+        try {
+            Class<ExtensionBuilder> clazz = (Class<ExtensionBuilder>) extensionClassloader
+                    .loadClass(result.get(0).getCanonicalName());
+            extensionBuilder = clazz.newInstance();
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+            throw new FalconCLIException("Failed to instantiate extension implementation " + extensionName, e);
+        }
+
+        extensionBuilder.validateExtensionConfig(extensionName, configStream);
+        List<Entity> entities = extensionBuilder.getEntities(jobName, configStream);
+
+        return entities;
+    }
+
+    public static List<Entity> loadAndPrepare(String extensionName, String jobName, InputStream configStream,
+                                              String extensionBuildLocation) throws IOException, FalconException {
+        Configuration conf = new Configuration();
+        FileSystem fs = FileSystem.get(conf);
+        String stagePath = createStagePath(extensionName, jobName);
+        List<URL> urls = ExtensionHandler.copyExtensionPackage(extensionBuildLocation, fs, stagePath);
+
+        List<Entity> entities = prepare(extensionName, jobName, configStream, urls);
+        ExtensionHandler.stageEntities(entities, stagePath);
+        return entities;
+    }
+
+    public static List<Entity> prepare(String extensionName, String jobName, InputStream configStream, List<URL> urls)
+        throws IOException, FalconException {
+        ClassLoader extensionClassLoader = ExtensionClassLoader.load(urls);
+        ExtensionHandler extensionHandler = new ExtensionHandler();
+
+        return extensionHandler.getEntities(extensionClassLoader, extensionName, jobName, configStream);
+    }
+
+    // This method is only for debugging, the staged entities can be found in /tmp path.
+    private static void stageEntities(List<Entity> entities, String stagePath) {
+        File entityFile;
+        EntityType type;
+        for (Entity entity : entities) {
+            type = entity.getEntityType();
+            OutputStream out;
+            try {
+                entityFile = new File(stagePath + File.separator + entity.getEntityType().toString() + "_"
+                    + URLEncoder.encode(entity.getName(), UTF_8));
+                if (!entityFile.createNewFile()) {
+                    LOG.debug("Not able to stage the entities in the tmp path");
+                    return;
+                }
+                out = new FileOutputStream(entityFile);
+                type.getMarshaller().marshal(entity, out);
+                LOG.debug("Staged configuration {}/{}", type, entity.getName());
+                out.close();
+            } catch (Exception e) {
+                LOG.error("Unable to serialize the entity object {}/{}", type, entity.getName(), e);
+            }
+        }
+    }
+
+    private static String createStagePath(String extensionName, String jobName) {
+        String stagePath = TMP_BASE_DIR + File.separator + extensionName + File.separator + jobName
+                + File.separator + System.currentTimeMillis()/1000;
+        File tmpPath = new File(stagePath);
+        if (tmpPath.mkdir()) {
+            throw new FalconCLIException("Failed to create stage directory" + tmpPath.toString());
+        }
+        return stagePath;
+    }
+
+    public static List<URL> copyExtensionPackage(String extensionBuildUrl, FileSystem fs, String stagePath)
+        throws IOException {
+
+        Path libsPath = new Path(extensionBuildUrl, FalconExtensionConstants.LIBS);
+        Path buildLibsPath = new Path(libsPath, FalconExtensionConstants.BUILD);
+        Path localStagePath = new Path(stagePath);
+        Path localBuildLibsPath = new Path(localStagePath, FalconExtensionConstants.LIBS);
+        LOG.info("Copying build time libs from {} to {}", buildLibsPath, localBuildLibsPath);
+        fs.copyToLocalFile(buildLibsPath, localBuildLibsPath);
+
+        Path resourcesPath = new Path(extensionBuildUrl, FalconExtensionConstants.RESOURCES);
+        Path buildResourcesPath = new Path(resourcesPath, FalconExtensionConstants.BUILD);
+        Path localBuildResourcesPath = new Path(localStagePath, FalconExtensionConstants.RESOURCES);
+        LOG.info("Copying build time resources from {} to {}", buildLibsPath, localBuildResourcesPath);
+        fs.copyToLocalFile(buildResourcesPath, localBuildResourcesPath);
+
+        List<URL> urls = new ArrayList<>();
+        urls.addAll(getFilesInPath(localBuildLibsPath.toUri().toURL()));
+        urls.add(localBuildResourcesPath.toUri().toURL());
+        return urls;
+    }
+
+    public static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException {
+        List<URL> urls = new ArrayList<>();
+
+        File file = new File(fileURL.getPath());
+        if (file.isDirectory()) {
+            File[] files = file.listFiles();
+
+            if (files != null) {
+                for (File innerFile : files) {
+                    if (innerFile.isFile()) {
+                        urls.add(innerFile.toURI().toURL());
+                    } else {
+                        urls.addAll(getFilesInPath(file.toURI().toURL()));
+                    }
+                }
+            }
+
+            if (!fileURL.toString().endsWith("/")) {
+                fileURL = new URL(fileURL.toString() + "/");
+            }
+        }
+
+        urls.add(fileURL);
+        return urls;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 5d6eff5..01dd6c6 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -179,6 +179,18 @@ public abstract class AbstractFalconClient {
                                                 String properties);
 
     /**
+     * Prepare set of entities the extension has implemented and stage them to a local directory and submit them too.
+     * @param extensionName extension which is available in the store.
+     * @param jobName name to be used in all the extension entities' tagging that are built as part of
+     *                           loadAndPrepare.
+     * @param configPath path to extension parameters.
+     * @return
+     * @throws FalconCLIException
+     */
+    public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath,
+                                                String doAsUser);
+
+    /**
      *
      * Get list of the entities.
      * We have two filtering parameters for entity tags: "tags" and "tagkeys".

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index dabed3f..4d4517c 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -26,10 +26,14 @@ import com.sun.jersey.client.urlconnection.HTTPSProperties;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.net.util.TrustManagerUtils;
+import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
+import org.apache.falcon.ExtensionHandler;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.DateValidator;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.extensions.ExtensionType;
 import org.apache.falcon.metadata.RelationshipType;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.EntityList;
@@ -47,6 +51,8 @@ import org.apache.falcon.resource.TriageResult;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
 import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
@@ -118,6 +124,7 @@ public class FalconClient extends AbstractFalconClient {
 
 
     public static final String DO_AS_OPT = "doAs";
+    public static final String ENTITIES_OPT = "entities";
     /**
      * Name of the HTTP cookie used for the authentication token between the client and the server.
      */
@@ -134,6 +141,7 @@ public class FalconClient extends AbstractFalconClient {
             return true;
         }
     };
+    private static final String TAG_SEPARATOR = ",";
     private final WebResource service;
     private final AuthenticatedURL.Token authenticationToken;
 
@@ -1059,12 +1067,48 @@ public class FalconClient extends AbstractFalconClient {
         return getResponse(String.class, clientResponse);
     }
 
-    public APIResult submitExtensionJob(final String extensionName, final String filePath, final String doAsUser) {
-        InputStream entityStream = getServletInputStream(filePath);
+    @Override
+    public APIResult submitExtensionJob(final String extensionName, final String jobName, final String configPath,
+                                        final String doAsUser) {
         ClientResponse clientResponse = new ResourceBuilder()
+                .path(ExtensionOperations.DETAIL.path)
+                .call(ExtensionOperations.DETAIL);
+        JSONObject responseJson = clientResponse.getEntity(JSONObject.class);
+        ExtensionType extensionType;
+        String extensionBuildLocation;
+        try {
+            JSONObject extensionDetailsJson = new JSONObject(responseJson.get("detail").toString());
+            extensionType = ExtensionType.valueOf(extensionDetailsJson.get("type").toString().toUpperCase());
+            extensionBuildLocation = extensionDetailsJson.get("location").toString();
+        } catch (JSONException e) {
+            OUT.get().print("Error. " + extensionName + " not found ");
+            return null;
+        }
+        InputStream configStream = getServletInputStream(configPath);
+
+        List<Entity> entities;
+        if (extensionType.equals(ExtensionType.CUSTOM)) {
+            try {
+                entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream, extensionBuildLocation);
+            } catch (Exception e) {
+                OUT.get().println("Error in building the extension");
+                return null;
+            }
+            if (entities == null || entities.isEmpty()) {
+                OUT.get().println("No entities got built");
+                return null;
+            }
+            try {
+                EntityUtil.applyTags(extensionName, jobName, entities);
+            } catch (FalconException e) {
+                OUT.get().println("Error in applying tags to generated entities");
+            }
+        }
+
+        clientResponse = new ResourceBuilder()
                 .path(ExtensionOperations.SUBMIT.path, extensionName)
                 .addQueryParam(DO_AS_OPT, doAsUser)
-                .call(ExtensionOperations.SUBMIT, entityStream);
+                .call(ExtensionOperations.SUBMIT, configStream);
         return getResponse(APIResult.class, clientResponse);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/client/src/main/java/org/apache/falcon/client/FalconExtensionConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconExtensionConstants.java b/client/src/main/java/org/apache/falcon/client/FalconExtensionConstants.java
new file mode 100644
index 0000000..086a8bb
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/client/FalconExtensionConstants.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.client;
+
+/**
+ * Falcon Extensions Constants.
+ */
+public final class FalconExtensionConstants {
+    private FalconExtensionConstants() {
+
+    }
+
+    public static final String SERVICES = "SERVICES";
+    public static final String META_INF = "META-INF";
+    public static final String LIBS = "libs";
+    public static final String RESOURCES = "resources";
+    public static final String BUILD = "build";
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index f3d5d28..183661b 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -93,8 +93,12 @@ public final class EntityUtil {
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
     public static final String WF_LIB_SEPARATOR = ",";
+    public static final String TAG_SEPARATOR = ",";
     private static final String STAGING_DIR_NAME_SEPARATOR = "_";
 
+    public static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name=";
+    public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
+
     public static final ThreadLocal<SimpleDateFormat> PATH_FORMAT = new ThreadLocal<SimpleDateFormat>() {
         @Override
         protected SimpleDateFormat initialValue() {
@@ -1194,6 +1198,48 @@ public final class EntityUtil {
         }
     }
 
+    /**
+     * Set the tags to a given entity.
+     * @param entity
+     * @param tags
+     */
+    public static void setEntityTags(Entity entity, String tags) {
+        switch (entity.getEntityType()) {
+        case PROCESS:
+            ((Process) entity).setTags(tags);
+            break;
+        case FEED:
+            ((Feed) entity).setTags(tags);
+            break;
+        case CLUSTER:
+            ((Cluster) entity).setTags(tags);
+            break;
+        default:
+            throw new IllegalArgumentException("Unhandled entity type " + entity.getEntityType());
+        }
+    }
+
+    public static void applyTags(String extensionName, String jobName, List<Entity> entities) throws FalconException {
+        for (Entity entity : entities) {
+            String tags = entity.getTags();
+            if (StringUtils.isNotEmpty(tags)) {
+                if (tags.contains(TAG_PREFIX_EXTENSION_NAME)) {
+                    throw new FalconException("Generated extension entity " + entity.getName()
+                            + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_NAME);
+                }
+                if (tags.contains(TAG_PREFIX_EXTENSION_JOB)) {
+                    throw new FalconException("Generated extension entity " + entity.getName()
+                            + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_JOB);
+                }
+                setEntityTags(entity, tags + TAG_SEPARATOR + TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR
+                        + TAG_PREFIX_EXTENSION_JOB + jobName);
+            } else {
+                setEntityTags(entity, TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR
+                        + TAG_PREFIX_EXTENSION_JOB + jobName);
+            }
+        }
+    }
+
 
     /**
      * @param properties - String of format key1:value1, key2:value2

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/docs/src/site/twiki/falconcli/ExtensionSubmit.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/falconcli/ExtensionSubmit.twiki b/docs/src/site/twiki/falconcli/ExtensionSubmit.twiki
index 40a7b44..2cda478 100644
--- a/docs/src/site/twiki/falconcli/ExtensionSubmit.twiki
+++ b/docs/src/site/twiki/falconcli/ExtensionSubmit.twiki
@@ -5,7 +5,7 @@
 Submit an extension job.
 
 Usage:
-$FALCON_HOME/bin/falcon extension -submit -extensionName <<extension-name>> -file <<path-to-file>>
+$FALCON_HOME/bin/falcon extension -submit -extensionName <<extension-name>> -jobName <<job-name>> -file <<path-to-file>>
 
 Optional Args : -doAs <<user-name>>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/docs/src/site/twiki/falconcli/ExtensionSubmitAndSchedule.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/falconcli/ExtensionSubmitAndSchedule.twiki b/docs/src/site/twiki/falconcli/ExtensionSubmitAndSchedule.twiki
index ea1c0e2..4dbd76d 100644
--- a/docs/src/site/twiki/falconcli/ExtensionSubmitAndSchedule.twiki
+++ b/docs/src/site/twiki/falconcli/ExtensionSubmitAndSchedule.twiki
@@ -5,7 +5,7 @@
 Submit and schedule an extension job.
 
 Usage:
-$FALCON_HOME/bin/falcon extension -submitAndSchedule -extensionName <<extension-name>> -file <<path-to-file>>
+$FALCON_HOME/bin/falcon extension -submitAndSchedule -extensionName <<extension-name>> -jobName <<job-name>> -file <<path-to-file>>
 
 Optional Args : -doAs <<user-name>>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/extensions/src/test/java/org/apache/falcon/ExtensionExample.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/ExtensionExample.java b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java
new file mode 100644
index 0000000..f527f2e
--- /dev/null
+++ b/extensions/src/test/java/org/apache/falcon/ExtensionExample.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.Schema;
+import org.apache.falcon.extensions.ExtensionBuilder;
+
+import javax.xml.bind.JAXBException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Extension Example for testing extension loading and preparing entities.
+ */
+public class ExtensionExample implements ExtensionBuilder{
+
+    public static final String PROCESS_XML = "/process.xml";
+
+    @Override
+    public List<Entity> getEntities(String extensionName, InputStream extensionConfigStream) throws FalconException {
+        Entity process;
+        try {
+            process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal(
+                    getClass().getResourceAsStream(PROCESS_XML));
+        } catch (JAXBException e) {
+            throw new FalconException("Failed in unmarshalling the entity");
+        }
+        List<Entity> entities = new ArrayList<>();
+        entities.add(process);
+        return entities;
+    }
+
+    @Override
+    public void validateExtensionConfig(String extensionName, InputStream extensionConfigStream)
+        throws FalconException {
+
+    }
+
+    @Override
+    public List<Pair<String, Schema>> getOutputSchemas(String extensionName) throws FalconException {
+        return null;
+    }
+
+    public String toString(String testString) {
+        return testString;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder b/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder
new file mode 100644
index 0000000..a8d3cf8
--- /dev/null
+++ b/extensions/src/test/resources/META-INF/services/org.apache.falcon.extensions.ExtensionBuilder
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+ org.apache.falcon.ExtensionExample
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/extensions/src/test/resources/process.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/process.xml b/extensions/src/test/resources/process.xml
new file mode 100644
index 0000000..48e3a16
--- /dev/null
+++ b/extensions/src/test/resources/process.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="sample" version="0" xmlns="uri:falcon:process:0.1">
+    <tags>consumer=consumer@xyz.com,owner=producer@xyz.com,_department_type=forecasting</tags>
+    <pipelines>testPipeline</pipelines>
+    <clusters>
+        <cluster name="testCluster">
+            <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/>
+
+    <!-- what -->
+    <inputs>
+        <input name="impression" feed="impressionFeed" start="today(0,0)" end="today(2,0)" partition="*/US"/>
+        <input name="clicks" feed="clicksFeed" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="impOutput" feed="imp-click-join1" instance="today(0,0)"/>
+        <output name="clicksOutput" feed="imp-click-join2" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <properties>
+        <property name="name1" value="value1"/>
+        <property name="name2" value="value2"/>
+    </properties>
+
+    <workflow engine="oozie" path="/falcon/test/workflow"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+
+    <late-process policy="exp-backoff" delay="hours(1)">
+        <late-input input="impression" workflow-path="himpression/late/workflow"/>
+        <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/>
+    </late-process>
+
+    <notification type="email" to="falcon@localhost"/>
+</process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
index 266e631..6f2974d 100644
--- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
@@ -21,12 +21,10 @@ package org.apache.falcon.resource.extensions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.entity.store.StoreAccessException;
 import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.extensions.Extension;
 import org.apache.falcon.extensions.ExtensionProperties;
 import org.apache.falcon.extensions.ExtensionService;
@@ -517,44 +515,12 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
         List<Entity> entities = extension.getEntities(extensionName, request.getInputStream());
 
         // add tags on extension name and job
-        for (Entity entity : entities) {
-            String tags = entity.getTags();
-            if (StringUtils.isNotEmpty(tags)) {
-                if (tags.contains(TAG_PREFIX_EXTENSION_NAME)) {
-                    throw new FalconException("Generated extention entity " + entity.getName()
-                            + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_NAME);
-                }
-                if (tags.contains(TAG_PREFIX_EXTENSION_JOB)) {
-                    throw new FalconException("Generated extention entity " + entity.getName()
-                            + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_JOB);
-                }
-                setEntityTags(entity, tags + TAG_SEPARATOR + TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR
-                        + TAG_PREFIX_EXTENSION_JOB + properties.getProperty(ExtensionProperties.JOB_NAME.getName()));
-            } else {
-                setEntityTags(entity, TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR
-                        + TAG_PREFIX_EXTENSION_JOB + properties.getProperty(ExtensionProperties.JOB_NAME.getName()));
-            }
-        }
+        String jobName = properties.getProperty(ExtensionProperties.JOB_NAME.getName());
+        EntityUtil.applyTags(extensionName, jobName, entities);
 
         return entities;
     }
 
-    private void setEntityTags(Entity entity, String tags) {
-        switch (entity.getEntityType()) {
-        case PROCESS:
-            ((Process) entity).setTags(tags);
-            break;
-        case FEED:
-            ((Feed) entity).setTags(tags);
-            break;
-        case CLUSTER:
-            ((Cluster) entity).setTags(tags);
-            break;
-        default:
-            LOG.error("Unknown entity type: {}", entity.getEntityType().name());
-        }
-    }
-
     private  JSONObject buildDetailResult(final String extensionName) throws FalconException {
         ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore();
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/unit/pom.xml
----------------------------------------------------------------------
diff --git a/unit/pom.xml b/unit/pom.xml
index 24b39b7..6405460 100644
--- a/unit/pom.xml
+++ b/unit/pom.xml
@@ -132,6 +132,29 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <phase>process-sources</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    <artifactId>falcon-extensions</artifactId>
+                                    <version>${project.version}</version>
+                                    <type>test-jar</type>
+                                </artifactItem>
+                            </artifactItems>
+                            <outputDirectory>${basedir}/src/test/resources</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 53073f0..7248964 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -266,6 +266,12 @@ public class FalconUnitClient extends AbstractFalconClient {
     }
 
     @Override
+    public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
+        //TODO Make falcon unit client changes for submitting recipe too.
+        throw new UnsupportedOperationException("Not yet Implemented");
+    }
+
+    @Override
     public EntityList getEntityList(String entityType, String fields, String nameSubsequence, String tagKeywords,
                                     String filterBy, String filterTags, String orderBy, String sortOrder,
                                     Integer offset, Integer numResults, String doAsUser) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/unit/src/test/java/org/apache/falcon/ExtensionClassLoaderTest.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/ExtensionClassLoaderTest.java b/unit/src/test/java/org/apache/falcon/ExtensionClassLoaderTest.java
new file mode 100644
index 0000000..dbd8603
--- /dev/null
+++ b/unit/src/test/java/org/apache/falcon/ExtensionClassLoaderTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test Class for validating Extension Class Loader.
+ */
+public class ExtensionClassLoaderTest {
+    public static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources";
+
+    @Test
+    public void testManuallyLoadedClass() throws Exception{
+
+        List<URL> urls = new ArrayList<>();
+
+        urls.addAll(ExtensionHandler.getFilesInPath(new Path(JARS_DIR).toUri().toURL()));
+
+        ClassLoader loader = ExtensionClassLoader.load(urls);
+        ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(loader);
+        Class<?> classManuallyLoaded = loader.loadClass("org.apache.falcon.ExtensionExample");
+
+        Object exampleExtension = classManuallyLoaded.newInstance();
+
+        Method methodToString = classManuallyLoaded.getMethod("toString", String.class);
+
+        Thread.currentThread().setContextClassLoader(previousClassLoader);
+        Assert.assertEquals("test", methodToString.invoke(exampleExtension, "test"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java b/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java
new file mode 100644
index 0000000..7e931d7
--- /dev/null
+++ b/unit/src/test/java/org/apache/falcon/ExtensionUtilTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon;
+
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test Class for validating Extension util helper methods.
+ */
+public class ExtensionUtilTest {
+    public static final String PROCESS_XML = "/process.xml";
+    public static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources";
+    public static final String CONFIG_PATH = "file:///" + System.getProperty("user.dir")
+            + "/src/test/resources/extension.properties";
+
+    @Test
+    public void testPrepareAndSetEntityTags() throws Exception {
+        Entity process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal(
+                getClass().getResourceAsStream(PROCESS_XML));
+        EntityUtil.setEntityTags(process, "testTag");
+        Assert.assertTrue(EntityUtil.getTags(process).contains("testTag"));
+
+        List<URL> urls = new ArrayList<>();
+
+        InputStream configStream = null;
+        try {
+            configStream = new FileInputStream(CONFIG_PATH);
+        } catch (FileNotFoundException e) {
+            //ignore
+        }
+
+        urls.addAll(ExtensionHandler.getFilesInPath(new Path(JARS_DIR).toUri().toURL()));
+        List<Entity> entities = ExtensionHandler.prepare("extensionName", "jobName", configStream, urls);
+        Assert.assertEquals(entities.size(), 1);
+        Assert.assertEquals(entities.get(0), process);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f508799/unit/src/test/resources/extension.properties
----------------------------------------------------------------------
diff --git a/unit/src/test/resources/extension.properties b/unit/src/test/resources/extension.properties
new file mode 100644
index 0000000..d52de1e
--- /dev/null
+++ b/unit/src/test/resources/extension.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+####################################################
+####    This is used for falcon packaging only. ####
+####################################################
+
+pipelines.name=test


Mime
View raw message