falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [1/5] git commit: FALCON-635 Add recipe option in falcon client. Contributed by Sowmya Ramesh
Date Fri, 12 Sep 2014 20:29:29 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master bfc8766b3 -> fa0f61269


FALCON-635 Add recipe option in falcon client. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/c54c1ef2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/c54c1ef2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/c54c1ef2

Branch: refs/heads/master
Commit: c54c1ef286757e67ef1da756698076c0f8cf6834
Parents: bfc8766
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Fri Sep 12 13:25:43 2014 -0700
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Fri Sep 12 13:25:43 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 client/pom.xml                                  |   4 +
 .../java/org/apache/falcon/cli/FalconCLI.java   |  44 ++-
 .../org/apache/falcon/client/FalconClient.java  |  60 ++++
 .../org/apache/falcon/recipe/RecipeTool.java    | 274 +++++++++++++++++++
 .../apache/falcon/recipe/RecipeToolArgs.java    |  70 +++++
 .../apache/falcon/recipe/RecipeToolOptions.java |  60 ++++
 docs/src/site/twiki/FalconCLI.twiki             |  19 ++
 docs/src/site/twiki/FalconDocumentation.twiki   |   9 +-
 docs/src/site/twiki/recipes.twiki               |  71 +++++
 .../java/org/apache/falcon/cli/FalconCLIIT.java |  66 ++++-
 .../org/apache/falcon/resource/TestContext.java |  39 ++-
 webapp/src/test/resources/client.properties     |   3 +-
 webapp/src/test/resources/process-template.xml  |   6 +-
 webapp/src/test/resources/process.properties    |  25 ++
 15 files changed, 732 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 149f4e6..b85ca8b 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-635 Add recipe option in falcon client (Sowmya Ramesh via
+   Venkatesh Seetharam)
+
    FALCON-615 Add pipleline element to lineage graph
    (Sowmya Ramesh via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 70e0db6..e76865d 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -52,6 +52,10 @@
                     <groupId>org.apache.hadoop</groupId>
                     <artifactId>hadoop-auth</artifactId>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </dependency>
             </dependencies>
         </profile>
     </profiles>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index d6e3598..fc2236b 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -121,6 +121,11 @@ public class FalconCLI {
     public static final String VALUE_OPT = "value";
     public static final String DIRECTION_OPT = "direction";
 
+    // Recipe Command
+    public static final String RECIPE_CMD = "recipe";
+    public static final String RECIPE_NAME = "name";
+    public static final String RECIPE_TOOL_CLASS_NAME = "tool";
+
     private final Properties clientProperties;
 
     public FalconCLI() throws Exception {
@@ -168,6 +173,7 @@ public class FalconCLI {
                 "Process instances operations like running, status, kill, suspend, resume, rerun, logs",
                 instanceOptions(), false);
         parser.addCommand(GRAPH_CMD, "", "graph operations", createGraphOptions(), true);
+        parser.addCommand(RECIPE_CMD, "", "recipe operations", createRecipeOptions(), true);
 
         try {
             CLIParser.Command command = parser.parse(args);
@@ -187,6 +193,8 @@ public class FalconCLI {
                     instanceCommand(commandLine, client);
                 } else if (command.getName().equals(GRAPH_CMD)) {
                     graphCommand(commandLine, client);
+                } else if (command.getName().equals(RECIPE_CMD)) {
+                    recipeCommand(commandLine, client);
                 }
             }
             return exitValue;
@@ -357,21 +365,21 @@ public class FalconCLI {
         String entityAction = "entity";
 
         if (optionsList.contains(SUBMIT_OPT)) {
-            validateFilePath(filePath);
+            validateFilePath(filePath, "file");
             validateColo(optionsList);
             result = client.submit(entityType, filePath);
         } else if (optionsList.contains(UPDATE_OPT)) {
-            validateFilePath(filePath);
+            validateFilePath(filePath, "file");
             validateColo(optionsList);
             validateEntityName(entityName);
             Date effectiveTime = parseDateString(time);
             result = client.update(entityType, entityName, filePath, effectiveTime);
         } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) {
-            validateFilePath(filePath);
+            validateFilePath(filePath, "file");
             validateColo(optionsList);
             result = client.submitAndSchedule(entityType, filePath);
         } else if (optionsList.contains(VALIDATE_OPT)) {
-            validateFilePath(filePath);
+            validateFilePath(filePath, "file");
             validateColo(optionsList);
             result = client.validate(entityType, filePath);
         } else if (optionsList.contains(SCHEDULE_OPT)) {
@@ -448,11 +456,11 @@ public class FalconCLI {
         return colo;
     }
 
-    private void validateFilePath(String filePath)
+    private void validateFilePath(String filePath, String argument)
         throws FalconCLIException {
 
         if (StringUtils.isEmpty(filePath)) {
-            throw new FalconCLIException("Missing argument: file");
+            throw new FalconCLIException("Missing argument: " + argument);
         }
     }
 
@@ -823,6 +831,20 @@ public class FalconCLI {
         return graphOptions;
     }
 
+    private Options createRecipeOptions() {
+        Options recipeOptions = new Options();
+        Option url = new Option(URL_OPTION, true, "Falcon URL");
+        recipeOptions.addOption(url);
+
+        Option recipeFileOpt = new Option(RECIPE_NAME, true, "recipe name");
+        recipeOptions.addOption(recipeFileOpt);
+
+        Option recipeToolClassName = new Option(RECIPE_TOOL_CLASS_NAME, true, "recipe class");
+        recipeOptions.addOption(recipeToolClassName);
+
+        return recipeOptions;
+    }
+
     private void graphCommand(CommandLine commandLine,
                               FalconClient client) throws FalconCLIException {
         Set<String> optionsList = new HashSet<String>();
@@ -965,4 +987,14 @@ public class FalconCLI {
         }
         return null;
     }
+
+    private void recipeCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException {
+        String recipeName = commandLine.getOptionValue(RECIPE_NAME);
+        String recipeToolClass = commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME);
+
+        validateFilePath(recipeName, RECIPE_NAME);
+
+        String result = client.submitRecipe(recipeName, recipeToolClass);
+        OUT.get().println(result);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 73635fa..a63a76b 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -29,6 +29,8 @@ import org.apache.commons.net.util.TrustManagerUtils;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.recipe.RecipeTool;
+import org.apache.falcon.recipe.RecipeToolArgs;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.EntitySummaryResult;
@@ -49,12 +51,14 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.security.SecureRandom;
 import java.util.Date;
@@ -82,6 +86,9 @@ public class FalconClient {
     private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";
     private static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator();
 
+    private static final String TEMPLATE_SUFFIX = "-template.xml";
+    private static final String PROPERTIES_SUFFIX = ".properties";
+
     public static final int DEFAULT_NUM_RESULTS = 10;
 
     public static final HostnameVerifier ALL_TRUSTING_HOSTNAME_VERIFIER = new HostnameVerifier() {
@@ -967,6 +974,59 @@ public class FalconClient {
         return sendGraphRequest(GraphOperations.EDGES, id);
     }
 
+    public String submitRecipe(String recipeName,
+                               String recipeToolClassName) throws FalconCLIException {
+        String recipePath = clientProperties.getProperty("falcon.recipe.path",
+            System.getProperty("falcon.home"));
+
+        String recipeFilePath = recipePath + File.separator + recipeName + TEMPLATE_SUFFIX;
+        File file = new File(recipeFilePath);
+        if (!file.exists()) {
+            throw new FalconCLIException("Recipe template file does not exist : " + recipeFilePath);
+        }
+
+        String propertiesFilePath = recipePath + File.separator + recipeName + PROPERTIES_SUFFIX;
+        file = new File(propertiesFilePath);
+        if (!file.exists()) {
+            throw new FalconCLIException("Recipe properties file does not exist : " + propertiesFilePath);
+        }
+
+        String processFile = null;
+        try {
+            String prefix =  "falcon-recipe" + "-" + System.currentTimeMillis();
+            File tmpPath = new File("/tmp");
+            if (!tmpPath.exists()) {
+                if (!tmpPath.mkdir()) {
+                    throw new FalconCLIException("Creating directory failed: " + tmpPath.getAbsolutePath());
+                }
+            }
+            File f = File.createTempFile(prefix, ".xml", tmpPath);
+            f.deleteOnExit();
+
+            processFile = f.getAbsolutePath();
+            String[] args = {
+                "-" + RecipeToolArgs.RECIPE_FILE_ARG.getName(), recipeFilePath,
+                "-" + RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG.getName(), propertiesFilePath,
+                "-" + RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG.getName(), processFile,
+            };
+
+            if (recipeToolClassName != null) {
+                Class<?> clz = Class.forName(recipeToolClassName);
+                Method method = clz.getMethod("main", String[].class);
+                method.invoke(null, args);
+            } else {
+                RecipeTool.main(args);
+            }
+            validate(EntityType.PROCESS.toString(), processFile);
+            String result = submitAndSchedule(EntityType.PROCESS.toString(), processFile);
+            return result + System.getProperty("line.separator") + "Submitted process entity: " + processFile;
+        } catch (Exception e) {
+            String msg = (processFile == null) ? e.getMessage()
+                : e.getMessage() + System.getProperty("line.separator") + "Submitted process entity: " + processFile;
+            throw new FalconCLIException(msg, e);
+        }
+    }
+
     private String sendGraphRequest(GraphOperations job, String id) throws FalconCLIException {
         ClientResponse clientResponse = service.path(job.path)
                 .path(id)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
new file mode 100644
index 0000000..0e94b62
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.commons.cli.Options;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Base recipe tool for Falcon recipes.
+ */
+public class RecipeTool extends Configured implements Tool {
+    private static final String HDFS_WF_PATH = "falcon" + File.separator + "recipes" + File.separator;
+    private static final String RECIPE_PREFIX = "falcon.recipe.";
+    private static final Pattern RECIPE_VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##");
+
+    private FileSystem hdfsFileSystem;
+
+    public static void main(String[] args) throws Exception {
+        ToolRunner.run(new Configuration(), new RecipeTool(), args);
+    }
+
+    @Override
+    public int run(String[] arguments) throws Exception {
+        Map<RecipeToolArgs, String> argMap = setupArgs(arguments);
+        if (argMap == null || argMap.isEmpty()) {
+            throw new Exception("Arguments passed to recipe is null");
+        }
+
+        String recipePropertiesFilePath = argMap.get(RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG);
+        Properties recipeProperties = loadProperties(recipePropertiesFilePath);
+        validateProperties(recipeProperties);
+
+        validateArtifacts(recipeProperties);
+        String recipeName = FilenameUtils.getBaseName(recipePropertiesFilePath);
+
+        FileSystem fs = getFileSystemForHdfs(recipeProperties);
+        copyFilesToHdfsIfRequired(recipeProperties, fs, recipeName);
+
+        Map<String, String> overlayMap = getOverlay(recipeProperties);
+        overlayParametersOverTemplate(argMap.get(RecipeToolArgs.RECIPE_FILE_ARG), argMap.get(RecipeToolArgs
+                .RECIPE_PROCESS_XML_FILE_PATH_ARG), overlayMap);
+
+        System.out.println("Completed disaster recovery");
+        return 0;
+    }
+
+    private Map<RecipeToolArgs, String> setupArgs(final String[] arguments) throws ParseException {
+        Options options = new Options();
+        Map<RecipeToolArgs, String> argMap = new HashMap<RecipeToolArgs, String>();
+
+        for (RecipeToolArgs arg : RecipeToolArgs.values()) {
+            addOption(options, arg, arg.isRequired());
+        }
+
+        CommandLine cmd =  new GnuParser().parse(options, arguments);
+        for (RecipeToolArgs arg : RecipeToolArgs.values()) {
+            String optionValue = arg.getOptionValue(cmd);
+            if (StringUtils.isNotEmpty(optionValue)) {
+                argMap.put(arg, optionValue);
+            }
+        }
+        return argMap;
+    }
+
+    private static void addOption(final Options options, final RecipeToolArgs arg,
+                                  final boolean isRequired) {
+        Option option = arg.getOption();
+        option.setRequired(isRequired);
+        options.addOption(option);
+    }
+
+    private static void validateProperties(final Properties recipeProperties) {
+        for (RecipeToolOptions option : RecipeToolOptions.values()) {
+            if (recipeProperties.getProperty(option.getName()) == null && option.isRequired()) {
+                throw new IllegalArgumentException("Missing argument: " + option.getName());
+            }
+        }
+    }
+
+    private static Properties loadProperties(final String propertiesFilePath) throws Exception {
+        InputStream inputStream = null;
+        try {
+            inputStream = new FileInputStream(propertiesFilePath);
+            Properties prop = new Properties();
+            prop.load(inputStream);
+            return prop;
+        } finally {
+            IOUtils.closeQuietly(inputStream);
+        }
+    }
+
+    private static void validateArtifacts(final Properties recipeProperties) throws Exception{
+        // validate the WF path
+        String wfPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_PATH.getName());
+
+        // If the file doesn't exist locally throw exception
+        if (!StringUtils.isEmpty(wfPath) && !doesFileExist(wfPath)) {
+            throw new Exception("Recipe workflow file does not exist : " + wfPath);
+        }
+
+        // validate lib path
+        String libPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_LIB_PATH.getName());
+        if (!StringUtils.isEmpty(libPath) && !doesFileExist(libPath)) {
+            throw new Exception("Recipe lib file path does not exist : " + libPath);
+        }
+    }
+
+    private static Map<String, String> getOverlay(final Properties recipeProperties) {
+        Map<String, String> overlay = new HashMap<String, String>();
+        for (Map.Entry<Object, Object> entry : recipeProperties.entrySet()) {
+            String key = StringUtils.removeStart((String) entry.getKey(), RECIPE_PREFIX);
+            overlay.put(key, (String) entry.getValue());
+        }
+
+        return overlay;
+    }
+
+    private static String overlayParametersOverTemplate(final String templateFile,
+                                                        final String outFilename,
+                                                        Map<String, String> overlay) throws Exception {
+        if (templateFile == null || outFilename == null || overlay == null || overlay.isEmpty()) {
+            throw new IllegalArgumentException("Invalid arguments passed");
+        }
+
+        String line;
+        OutputStream out = null;
+        BufferedReader reader = null;
+
+        try {
+            out = new FileOutputStream(outFilename);
+
+            reader = new BufferedReader(new FileReader(templateFile));
+            while ((line = reader.readLine()) != null) {
+                Matcher matcher = RECIPE_VAR_PATTERN.matcher(line);
+                while (matcher.find()) {
+                    String variable = line.substring(matcher.start(), matcher.end());
+                    String paramString = overlay.get(variable.substring(2, variable.length() - 2));
+                    if (paramString == null) {
+                        throw new Exception("Match not found for the template: " + variable);
+                    }
+                    line = line.replace(variable, paramString);
+                    matcher = RECIPE_VAR_PATTERN.matcher(line);
+                }
+                out.write(line.getBytes());
+                out.write("\n".getBytes());
+            }
+        } finally {
+            IOUtils.closeQuietly(reader);
+            IOUtils.closeQuietly(out);
+        }
+        return outFilename;
+    }
+
+    private static void copyFilesToHdfsIfRequired(final Properties recipeProperties,
+                                                  final FileSystem fs,
+                                                  final String recipeName) throws Exception {
+        String recipeWfPathName = RecipeToolOptions.WORKFLOW_PATH.getName();
+        String wfPath = recipeProperties.getProperty(recipeWfPathName);
+        String wfPathValue;
+
+        String hdfsPath = HDFS_WF_PATH + recipeName + File.separator;
+        if (!StringUtils.isEmpty(wfPath)) {
+            createDirOnHdfs(hdfsPath, fs);
+            if (new File(wfPath).isDirectory()) {
+                wfPathValue = hdfsPath + getLastPartOfPath(wfPath);
+            } else {
+                wfPathValue = hdfsPath + new File(wfPath).getName();
+            }
+            copyFileFromLocalToHdfs(wfPath, hdfsPath, fs);
+            // Update the property with the hdfs path
+            recipeProperties.setProperty(recipeWfPathName, wfPathValue);
+            System.out.println("recipeWfPathName: " + recipeProperties.getProperty(recipeWfPathName));
+        }
+
+        String recipeWfLibPathName = RecipeToolOptions.WORKFLOW_LIB_PATH.getName();
+        String libPath = recipeProperties.getProperty(recipeWfLibPathName);
+        String libPathValue;
+        if (!StringUtils.isEmpty(libPath)) {
+            if (new File(libPath).isDirectory()) {
+                libPathValue = hdfsPath + getLastPartOfPath(libPath);
+                copyFileFromLocalToHdfs(libPath, hdfsPath, fs);
+            } else {
+                libPathValue = hdfsPath + "lib" + File.separator + new File(libPath).getName();
+                copyFileFromLocalToHdfs(libPath, libPathValue, fs);
+            }
+
+            // Update the property with the hdfs path
+            recipeProperties.setProperty(recipeWfLibPathName, libPathValue);
+            System.out.println("recipeWfLibPathName: " + recipeProperties.getProperty(recipeWfLibPathName));
+        }
+    }
+
+    private static String getLastPartOfPath(final String path) {
+        String normalizedWfPath = FilenameUtils.normalizeNoEndSeparator(path);
+        return (normalizedWfPath == null) ? FilenameUtils.getName(path)
+                : FilenameUtils.getName(normalizedWfPath);
+    }
+
+    private static void createDirOnHdfs(String path, FileSystem fs) throws IOException {
+        Path hdfsPath = new Path(path);
+        if (!fs.exists(hdfsPath)) {
+            fs.mkdirs(hdfsPath);
+        }
+    }
+
+    private static boolean doesFileExist(final String filename) {
+        return new File(filename).exists();
+    }
+
+    private static void copyFileFromLocalToHdfs(final String localFilePath,
+                                                final String hdfsFilePath,
+                                                final FileSystem fs) throws IOException {
+        // For cases where validation of process entity file fails, the artifacts would have been already copied to
+        // HDFS. Set overwrite to true so that next submit recipe copies updated artifats from local FS to HDFS
+        fs.copyFromLocalFile(false, true, new Path(localFilePath), new Path(hdfsFilePath));
+    }
+
+    private static Configuration getConfiguration(final String storageEndpoint) throws Exception {
+        Configuration conf = new Configuration();
+        conf.set("fs.default.name", storageEndpoint);
+        return conf;
+    }
+
+    private FileSystem getFileSystemForHdfs(final Properties recipeProperties) throws Exception {
+        if (hdfsFileSystem == null) {
+            String storageEndpoint = RecipeToolOptions.SOURCE_CLUSTER_HDFS_WRITE_ENDPOINT.getName();
+            hdfsFileSystem =  FileSystem.get(
+                    getConfiguration(recipeProperties.getProperty(storageEndpoint)));
+        }
+
+        return hdfsFileSystem;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java b/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
new file mode 100644
index 0000000..baa4846
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+
+/**
+ * Recipe tool args.
+ */
+public enum RecipeToolArgs {
+    RECIPE_FILE_ARG("file", "recipe file path"),
+    RECIPE_PROPERTIES_FILE_ARG("propertiesFile", "recipe properties file path"),
+    RECIPE_PROCESS_XML_FILE_PATH_ARG(
+            "recipeProcessFilePath", "file path of recipe process to be submitted");
+
+    private final String name;
+    private final String description;
+    private final boolean isRequired;
+    RecipeToolArgs(String name, String description) {
+        this(name, description, true);
+    }
+
+    RecipeToolArgs(String name, String description, boolean isRequired) {
+        this.name = name;
+        this.description = description;
+        this.isRequired = isRequired;
+    }
+
+    public Option getOption() {
+        return new Option(this.name, true, this.description);
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public boolean isRequired() {
+        return isRequired;
+    }
+
+    public String getOptionValue(CommandLine cmd) {
+        return cmd.getOptionValue(this.name);
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
new file mode 100644
index 0000000..a1c29cd
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+/**
+ * Recipe tool options.
+ */
+public enum RecipeToolOptions {
+    SOURCE_CLUSTER_HDFS_WRITE_ENDPOINT(
+            "falcon.recipe.src.cluster.hdfs.writeEndPoint", "source cluster HDFS write endpoint"),
+    WORKFLOW_PATH("falcon.recipe.workflow.path", "Workflow path", false),
+    WORKFLOW_LIB_PATH("falcon.recipe.workflow.lib.path", "WF lib path", false);
+
+    private final String name;
+    private final String description;
+    private final boolean isRequired;
+
+    RecipeToolOptions(String name, String description) {
+        this(name, description, true);
+    }
+
+    RecipeToolOptions(String name, String description, boolean isRequired) {
+        this.name = name;
+        this.description = description;
+        this.isRequired = isRequired;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public boolean isRequired() {
+        return isRequired;
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index 3b945e9..afa8136 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -300,3 +300,22 @@ $FALCON_HOME/bin/falcon admin -version
 Status returns the current state of Falcon (running or stopped).
 Usage:
 $FALCON_HOME/bin/falcon admin -status
+
+---++ Recipe Options
+
+---+++ Submit Recipe
+
+Submit the specified recipe.
+
+Usage:
+$FALCON_HOME/bin/falcon recipe -name <name>
+Name of the recipe. User should have defined <name>-template.xml and <name>.properties in the path specified by falcon.recipe.path in client.properties file. falcon.home path is used if its not specified in client.properties file.
+If its not specified in client.properties file and also if files cannot be found at falcon.home, Falcon CLI will fail.
+
+Optional Args : -tool <recipeToolClassName>
+Falcon provides a base tool that recipes can override. If this option is not specified the default Recipe Tool
+RecipeTool defined is used. This option is required if user defines his own recipe tool class.
+
+Example:
+$FALCON_HOME/bin/falcon recipe -name hdfs-replication
+

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/docs/src/site/twiki/FalconDocumentation.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconDocumentation.twiki b/docs/src/site/twiki/FalconDocumentation.twiki
index 8d2e10e..bff32c6 100644
--- a/docs/src/site/twiki/FalconDocumentation.twiki
+++ b/docs/src/site/twiki/FalconDocumentation.twiki
@@ -14,6 +14,7 @@
    * <a href="#Falcon_EL_Expressions">Falcon EL Expressions</a>
    * <a href="#Lineage">Lineage</a>
    * <a href="#Security">Security</a>
+   * <a href="#Recipes">Recipes</a>
 
 ---++ Architecture
 ---+++ Introduction
@@ -740,11 +741,15 @@ This feature is enabled by default but could be disabled by removing the followi
 <verbatim>
 config name: *.application.services
 config value: org.apache.falcon.metadata.MetadataMappingService
-<verbatim>
+</verbatim>
 
 Lineage is only captured for Process executions. A future release will capture lineage for
 lifecycle policies such as replication and retention.
 
---++ Security
+---++Security
 
 Security is detailed in [[Security][Security]].
+
+---++ Recipes
+
+Recipes is detailed in [[Recipes][Recipes]].

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/docs/src/site/twiki/recipes.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/recipes.twiki b/docs/src/site/twiki/recipes.twiki
new file mode 100644
index 0000000..e733889
--- /dev/null
+++ b/docs/src/site/twiki/recipes.twiki
@@ -0,0 +1,71 @@
+---+ Falcon Recipes
+
+---++ Overview
+
+A Falcon recipe is a static process template with parameterized workflow to realize a specific use case. Recipes are
+defined in user space. Recipes will not have support for update or lifecycle management.
+
+For example:
+
+   * Replicating directories from one HDFS cluster to another (not timed partitions)
+   * Replicating hive metadata (database, table, views, etc.)
+   * Replicating between HDFS and Hive - either way
+   * Data masking etc.
+
+---++ Proposal
+
+Falcon provides a Process abstraction that encapsulates the configuration for a user workflow with scheduling
+controls. All recipes can be modeled as a Process with in Falcon which executes the user workflow periodically. The
+process and its associated workflow are parameterized. The user will provide a properties file with name value pairs
+that are substituted by falcon before scheduling it. Falcon translates these recipes as a process entity by
+replacing the parameters in the workflow definition.
+
+---++ Falcon CLI recipe support
+
+Falcon CLI functionality to support recipes has been added.
+<a href="./FalconCLI.html">Recipe command usage is defined here.</a>
+
+CLI accepts recipe option with a recipe name and optional tool and does the following:
+   * Validates the options, name option is mandatory and tool is optional and should be provided if user wants to override the base recipe tool
+   * Looks for <name>-tempalte.xml and <name>.properties file in the path specified by falcon.recipe.path in client.properties or falcon.home. If files cannot be found then Falcon CLI will fail
+   * Invokes a Tool to substitute the properties in the templated process for the recipe. By default invokes base tool if tool option is not passed. Tool is responsible for generating process entity at the path specified by FalconCLI
+   * Validates the generated entity
+   * Submit and schedule this entity
+   * Generated process entity files are stored in tmp directory
+
+---++ Base Recipe tool
+
+Falcon provides a base tool that recipes can override. Base Recipe tool does the following:
+   * Expects recipe template file path, recipe properties file path and path where process entity to be submitted should be generated. Validates these arguments
+   * Validates the artifacts i.e. workflow and/or lib files specified in the recipe template exists on local filesystem or HDFS at the specified path else returns error
+   * Copies if the artifacts exists only on local filesystem
+      * If artifacts already exists on HDFS then hdfs path should be included in recipe template file. If the artifacts are on local filesystem then falcon.recipe.workflow.path and falcon.recipe.workflow.lib.path are mandatory properties in recipe property file. Recipe tool will copy the local artifacts only if these properties are set in properties file
+   * Looks for the patten ##[A-Za-z0-9_.]*## in the templated process and substitutes it with the properties. Process entity generated after the substitution is written to the empty file passed by FalconCLI
+
+---++ Recipe template file format
+
+   * Any templatized string should be in the format ##[A-Za-z0-9_.]*##.
+   * There should be a corresponding entry in the recipe properties file "falcon.recipe.<templatized-string> = <value to be substituted>"
+
+<verbatim>
+Example: If the entry in recipe template is <workflow name="##workflow.name##"> there should be a corresponding entry in the recipe properties file falcon.recipe.workflow.name=hdfs-dr-workflow
+</verbatim>
+
+---++ Recipe properties file format
+
+   * Regular key value pair properties file
+   * Property key should be prefixed by "falcon.recipe."
+
+<verbatim>
+Example: falcon.recipe.workflow.name=hdfs-dr-workflow
+Recipe template will have <workflow name="##workflow.name##">. Recipe tool will look for the patten ##workflow.name##
+and replace it with the property value "hdfs-dr-workflow". Substituted template will have <workflow name="hdfs-dr-workflow">
+</verbatim>
+
+---++ Sample recipes
+
+   * Sample recipes are published in addons/recipes
+
+---++ Packaging
+
+   * There is no packaging for recipes at this time but will be added soon.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 82208b0..caee676 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.cli;
 
+import org.apache.commons.io.FilenameUtils;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.resource.TestContext;
 import org.apache.falcon.util.OozieTestUtils;
@@ -27,11 +28,13 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.io.IOException;
 import java.util.Date;
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * Test for Falcon CLI.
@@ -40,8 +43,10 @@ import java.util.Map;
  */
 @Test(groups = {"exhaustive"})
 public class FalconCLIIT {
+    private static final String RECIPE_PROPERTIES_FILE_XML = "/process.properties";
 
     private InMemoryWriter stream = new InMemoryWriter(System.out);
+    private String recipePropertiesFilePath;
 
     @BeforeClass
     public void prepare() throws Exception {
@@ -875,6 +880,65 @@ public class FalconCLIIT {
                         + " -filterBy STATUS:SUCCEEDED -orderBy wrongOrder -offset 0 -numResults 1"));
     }
 
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    @Test
+    public void testRecipeCommand() throws Exception {
+        recipeSetup();
+        try {
+            Assert.assertEquals(executeWithURL("recipe -name " + "process"), 0);
+        } finally {
+            if (recipePropertiesFilePath != null) {
+                new File(recipePropertiesFilePath).delete();
+            }
+        }
+    }
+
+    private void recipeSetup() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        createPropertiesFile(context);
+        String filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(),
+                overlay);
+        Assert.assertEquals(0,
+                executeWithURL("entity -submit -type cluster -file " + filePath));
+        context.setCluster(overlay.get("cluster"));
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        Assert.assertEquals(0,
+                executeWithURL("entity -submit -type feed -file " + filePath));
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        Assert.assertEquals(0,
+                executeWithURL("entity -submit -type feed -file " + filePath));
+    }
+
+    private void createPropertiesFile(TestContext context) throws Exception  {
+        InputStream in = this.getClass().getResourceAsStream(RECIPE_PROPERTIES_FILE_XML);
+        Properties props = new Properties();
+        props.load(in);
+        in.close();
+
+        String wfFile = TestContext.class.getResource("/fs-workflow.xml").getPath();
+        String resourcePath = FilenameUtils.getFullPathNoEndSeparator(wfFile);
+        String libPath = TestContext.getTempFile("target/lib", "recipe", ".jar").getAbsolutePath();
+
+        File file = new File(resourcePath, "process.properties");
+        OutputStream out = new FileOutputStream(file);
+        props.setProperty("falcon.recipe.processName", context.getProcessName());
+        props.setProperty("falcon.recipe.src.cluster.name", context.getClusterName());
+        props.setProperty("falcon.recipe.inputFeedName", context.getInputFeedName());
+        props.setProperty("falcon.recipe.outputFeedName", context.getOutputFeedName());
+        props.setProperty("falcon.recipe.workflow.path", TestContext.class.getResource("/fs-workflow.xml").getPath());
+        props.setProperty("falcon.recipe.workflow.lib.path", new File(libPath).getParent());
+        props.setProperty("falcon.recipe.src.cluster.hdfs.writeEndPoint", "jail://global:00");
+
+        props.store(out, null);
+        out.close();
+
+        recipePropertiesFilePath = file.getAbsolutePath();
+    }
+
     private int executeWithURL(String command) throws Exception {
         //System.out.println("COMMAND IS "+command + " -url " + TestContext.BASE_URL);
         return new FalconCLI()

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 40da789..d1b320f 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -103,9 +103,10 @@ public class TestContext {
 
     protected String clusterName;
     protected String processName;
+    protected String inputFeedName;
     protected String outputFeedName;
 
-    public static final Pattern VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_]*##");
+    public static final Pattern VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##");
 
     public TestContext() {
         try {
@@ -188,6 +189,14 @@ public class TestContext {
         return processName;
     }
 
+    public String getInputFeedName() {
+        return inputFeedName;
+    }
+
+    public String getOutputFeedName() {
+        return outputFeedName;
+    }
+
     public String getClusterFileTemplate() {
         return CLUSTER_TEMPLATE;
     }
@@ -338,29 +347,42 @@ public class TestContext {
     }
 
     public static File getTempFile() throws IOException {
-        File target = new File("webapp/target");
-        if (!target.exists()) {
-            target = new File("target");
+        return getTempFile("test", ".xml");
+    }
+
+    public static File getTempFile(String prefix, String suffix) throws IOException {
+        return getTempFile("target", prefix, suffix);
+    }
+
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static File getTempFile(String path, String prefix, String suffix) throws IOException {
+        File f = new File(path);
+        if (!f.exists()) {
+            f.mkdirs();
         }
 
-        return File.createTempFile("test", ".xml", target);
+        return File.createTempFile(prefix, suffix, f);
     }
 
     public Map<String, String> getUniqueOverlay() throws FalconException {
         Map<String, String> overlay = new HashMap<String, String>();
         long time = System.currentTimeMillis();
         clusterName = "cluster" + time;
+        overlay.put("src.cluster.name", clusterName);
         overlay.put("cluster", clusterName);
         overlay.put("colo", DeploymentUtil.getCurrentColo());
-        overlay.put("inputFeedName", "in" + time);
+        inputFeedName = "in" + time;
+        overlay.put("inputFeedName", inputFeedName);
         //only feeds with future dates can be scheduled
         Date endDate = new Date(System.currentTimeMillis() + 15 * 60 * 1000);
         overlay.put("feedEndDate", SchemaHelper.formatDateUTC(endDate));
-        overlay.put("outputFeedName", "out" + time);
+        outputFeedName = "out" + time;
+        overlay.put("outputFeedName", outputFeedName);
         processName = "p" + time;
         overlay.put("processName", processName);
-        outputFeedName = "out" + time;
         overlay.put("user", System.getProperty("user.name"));
+        overlay.put("workflow.path", "/falcon/test/workflow");
+        overlay.put("workflow.lib.path", "/falcon/test/workflow/lib");
         return overlay;
     }
 
@@ -405,6 +427,7 @@ public class TestContext {
         fs.delete(wfParent, true);
         Path wfPath = new Path(wfParent, "workflow");
         mkdir(fs, wfPath);
+        mkdir(fs, new Path("/falcon/test/workflow/lib"));
         fs.copyFromLocalFile(false, true,
                 new Path(TestContext.class.getResource("/fs-workflow.xml").getPath()),
                 new Path(wfPath, "workflow.xml"));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/webapp/src/test/resources/client.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/client.properties b/webapp/src/test/resources/client.properties
index 6ecaa87..c9eecf5 100644
--- a/webapp/src/test/resources/client.properties
+++ b/webapp/src/test/resources/client.properties
@@ -16,4 +16,5 @@
 # limitations under the License.
 #
 
-falcon.url=http://localhost:41000/
\ No newline at end of file
+falcon.url=http://localhost:41000/
+falcon.recipe.path=target/test-classes
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/webapp/src/test/resources/process-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/process-template.xml b/webapp/src/test/resources/process-template.xml
index 06215a2..0add06a 100644
--- a/webapp/src/test/resources/process-template.xml
+++ b/webapp/src/test/resources/process-template.xml
@@ -21,7 +21,7 @@
     <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
     <pipelines>testPipeline,dataReplicationPipeline</pipelines>
     <clusters>
-        <cluster name="##cluster##">
+        <cluster name="##src.cluster.name##">
             <validity end="2012-04-21T00:00Z" start="2012-04-20T00:00Z"/>
         </cluster>
     </clusters>
@@ -43,10 +43,10 @@
         <property name="baseTime" value="${today(0,0)}"/>
         <property name="sundayThisWeek" value="${currentWeek('SUN', 0, 0)}"/>
     </properties>
-    <workflow engine="oozie" path="/falcon/test/workflow"/>
+    <workflow engine="oozie" path="##workflow.path##" lib="##workflow.lib.path##"/>
     <retry policy="periodic" delay="minutes(10)" attempts="3"/>
 
     <late-process policy="exp-backoff" delay="hours(1)">
         <late-input input="input" workflow-path="/falcon/test/workflow"/>
     </late-process>
-</process>
\ No newline at end of file
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c54c1ef2/webapp/src/test/resources/process.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/process.properties b/webapp/src/test/resources/process.properties
new file mode 100644
index 0000000..1eb282c
--- /dev/null
+++ b/webapp/src/test/resources/process.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+falcon.recipe.src.cluster.hdfs.writeEndPoint=jail://global:00
+falcon.recipe.workflow.path=
+falcon.recipe.processName=
+falcon.recipe.src.cluster.name=
+falcon.recipe.inputFeedName=
+falcon.recipe.outputFeedName=
+


Mime
View raw message