asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [2/4] incubator-asterixdb git commit: YARN integration for AsterixDB
Date Tue, 30 Jun 2015 06:50:37 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/69375a19/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java
----------------------------------------------------------------------
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java
new file mode 100644
index 0000000..63b85e6
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java
@@ -0,0 +1,1387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.common.configuration.Coredump;
+import edu.uci.ics.asterix.common.configuration.Store;
+import edu.uci.ics.asterix.common.configuration.TransactionLogDir;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class AsterixYARNClient {
+
+    public static enum Mode {
+        INSTALL("install"),
+        START("start"),
+        STOP("stop"),
+        KILL("kill"),
+        DESTROY("destroy"),
+        ALTER("alter"),
+        LIBINSTALL("libinstall"),
+        DESCRIBE("describe"),
+        BACKUP("backup"),
+        LSBACKUP("lsbackups"),
+        RMBACKUP("rmbackup"),
+        RESTORE("restore"),
+        NOOP("");
+
+        public final String alias;
+
+        Mode(String alias) {
+            this.alias = alias;
+        }
+
+        public static Mode fromAlias(String a) {
+            return STRING_TO_MODE.get(a.toLowerCase());
+        }
+    }
+
+    public static final Map<String, AsterixYARNClient.Mode> STRING_TO_MODE = ImmutableMap
+            .<String, AsterixYARNClient.Mode> builder().put(Mode.INSTALL.alias, Mode.INSTALL)
+            .put(Mode.START.alias, Mode.START).put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL)
+            .put(Mode.DESTROY.alias, Mode.DESTROY).put(Mode.ALTER.alias, Mode.ALTER)
+            .put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL).put(Mode.DESCRIBE.alias, Mode.DESCRIBE)
+            .put(Mode.BACKUP.alias, Mode.BACKUP).put(Mode.LSBACKUP.alias, Mode.LSBACKUP)
+            .put(Mode.RMBACKUP.alias, Mode.RMBACKUP).put(Mode.RESTORE.alias, Mode.RESTORE).build();
+    private static final Log LOG = LogFactory.getLog(AsterixYARNClient.class);
+    public static final String CONF_DIR_REL = ".asterix" + File.separator;
+    private static final String instanceLock = "instance";
+    public static final String CONFIG_DEFAULT_NAME = "cluster-config.xml";
+    public static final String PARAMS_DEFAULT_NAME = "asterix-configuration.xml";
+    private static String DEFAULT_PARAMETERS_PATH = "conf" + File.separator + "base-asterix-configuration.xml";
+    private static String MERGED_PARAMETERS_PATH = "conf" + File.separator + PARAMS_DEFAULT_NAME;
+    private static final String JAVA_HOME = System.getProperty("java.home");
+    public static final String NC_JAVA_OPTS_KEY = "nc.java.opts";
+    public static final String CC_JAVA_OPTS_KEY = "cc.java.opts";
+    public static final String CC_REST_PORT_KEY = "api.port";
+    private Mode mode = Mode.NOOP;
+
+    // Hadoop Configuration
+    private Configuration conf;
+    private YarnClient yarnClient;
+    // Application master specific info to register a new Application with
+    // RM/ASM
+    private String appName = "";
+    // App master priority
+    private int amPriority = 0;
+    // Queue for App master
+    private String amQueue = "";
+    // Amt. of memory resource to request for to run the App Master
+    private int amMemory = 1000;
+
+    // Main class to invoke application master
+    private final String appMasterMainClass = "edu.uci.ics.asterix.aoya.AsterixApplicationMaster";
+
+    //instance name
+    private String instanceName = "";
+    //location of distributable AsterixDB zip
+    private String asterixZip = "";
+    // Location of cluster configuration
+    private String asterixConf = "";
+    // Location of optional external libraries
+    private String extLibs = "";
+
+    private String instanceFolder = "";
+
+    // log4j.properties file
+    // if available, add to local resources and set into classpath
+    private String log4jPropFile = "";
+
+    // Debug flag
+    boolean debugFlag = false;
+    private boolean refresh = false;
+    private boolean force = false;
+
+    // Command line options
+    private Options opts;
+    private String libDataverse;
+    private String snapName = "";
+    private String baseConfig = ".";
+    private String ccJavaOpts = "";
+    private String ncJavaOpts = "";
+
+    //Ports
+    private int ccRestPort = 19002;
+
+    /**
+     * @param args
+     *            Command line arguments
+     */
+    public static void main(String[] args) {
+
+        try {
+            AsterixYARNClient client = new AsterixYARNClient();
+            try {
+                client.init(args);
+                AsterixYARNClient.execute(client);
+            } catch (ParseException | ApplicationNotFoundException e) {
+                LOG.fatal(e);
+                client.printUsage();
+                System.exit(-1);
+            }
+        } catch (Exception e) {
+            LOG.fatal("Error running client", e);
+            System.exit(1);
+        }
+        LOG.info("Command executed successfully.");
+        System.exit(0);
+    }
+
+    public static void execute(AsterixYARNClient client) throws IOException, YarnException {
+        YarnClientApplication app;
+        List<DFSResourceCoordinate> res;
+
+        System.out.println("JAVA HOME: " + JAVA_HOME);
+        switch (client.mode) {
+            case START:
+                startAction(client);
+                break;
+            case STOP:
+                try {
+                    client.stopInstance();
+                } catch (ApplicationNotFoundException e) {
+                    LOG.info(e);
+                    System.out.println("Asterix instance by that name already exited or was never started");
+                    client.deleteLockFile();
+                }
+                break;
+            case KILL:
+                if (client.isRunning() &&
+                    Utils.confirmAction("Are you sure you want to kill this instance? In-progress tasks will be aborted")) {
+                    try {
+                        AsterixYARNClient.killApplication(client.getLockFile(), client.yarnClient);
+                    } catch (ApplicationNotFoundException e) {
+                        LOG.info(e);
+                        System.out.println("Asterix instance by that name already exited or was never started");
+                        client.deleteLockFile();
+                    }
+                }
+                else if(!client.isRunning()){
+                    System.out.println("Asterix instance by that name already exited or was never started");
+                    client.deleteLockFile();
+                }
+                break;
+            case DESCRIBE:
+                Utils.listInstances(client.conf, CONF_DIR_REL);
+                break;
+            case INSTALL:
+                installAction(client);
+                break;
+            case LIBINSTALL:
+                client.installExtLibs();
+                break;
+            case ALTER:
+                client.writeAsterixConfig(Utils.parseYarnClusterConfig(client.asterixConf));
+                client.installAsterixConfig(true);
+                System.out.println("Configuration successfully modified");
+                break;
+            case DESTROY:
+                try {
+                    if (client.force
+                            || Utils.confirmAction("Are you really sure you want to obliterate this instance? This action cannot be undone!")) {
+                        app = client.makeApplicationContext();
+                        res = client.deployConfig();
+                        res.addAll(client.distributeBinaries());
+                        client.removeInstance(app, res);
+                    }
+                } catch (YarnException | IOException e) {
+                    LOG.error("Asterix failed to deploy on to cluster");
+                    throw e;
+                }
+                break;
+            case BACKUP:
+                if (client.force || Utils.confirmAction("Performing a backup will stop a running instance.")) {
+                    app = client.makeApplicationContext();
+                    res = client.deployConfig();
+                    res.addAll(client.distributeBinaries());
+                    client.backupInstance(app, res);
+                }
+                break;
+            case LSBACKUP:
+                Utils.listBackups(client.conf, CONF_DIR_REL, client.instanceName);
+                break;
+            case RMBACKUP:
+                Utils.rmBackup(client.conf, CONF_DIR_REL, client.instanceName, Long.parseLong(client.snapName));
+                break;
+            case RESTORE:
+                if (client.force || Utils.confirmAction("Performing a restore will stop a running instance.")) {
+                    app = client.makeApplicationContext();
+                    res = client.deployConfig();
+                    res.addAll(client.distributeBinaries());
+                    client.restoreInstance(app, res);
+                }
+                break;
+            default:
+                LOG.fatal("Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup");
+                client.printUsage();
+                System.exit(-1);
+        }
+    }
+
+    private static void startAction(AsterixYARNClient client) throws YarnException {
+        YarnClientApplication app;
+        List<DFSResourceCoordinate> res;
+        ApplicationId appId;
+        try {
+            app = client.makeApplicationContext();
+            res = client.deployConfig();
+            res.addAll(client.distributeBinaries());
+            appId = client.deployAM(app, res, client.mode);
+            LOG.info("Asterix started up with Application ID: " + appId.toString());
+            if (Utils.waitForLiveness(appId, "Waiting for AsterixDB instance to resume ", client.yarnClient,
+                    client.instanceName, client.conf, client.ccRestPort)) {
+                System.out.println("Asterix successfully deployed and is now running.");
+            } else {
+                LOG.fatal("AsterixDB appears to have failed to install and start");
+                throw new YarnException("AsterixDB appears to have failed to install and start");
+            }
+        } catch (IOException e) {
+            throw new YarnException(e);
+        }
+    }
+
+    private static void installAction(AsterixYARNClient client) throws YarnException {
+        YarnClientApplication app;
+        List<DFSResourceCoordinate> res;
+        ApplicationId appId;
+        try {
+            app = client.makeApplicationContext();
+            client.installConfig();
+            client.writeAsterixConfig(Utils.parseYarnClusterConfig(client.asterixConf));
+            client.installAsterixConfig(false);
+            res = client.deployConfig();
+            res.addAll(client.distributeBinaries());
+
+            appId = client.deployAM(app, res, client.mode);
+            LOG.info("Asterix started up with Application ID: " + appId.toString());
+            if (Utils.waitForLiveness(appId, "Waiting for new AsterixDB Instance to start ", client.yarnClient,
+                    client.instanceName, client.conf, client.ccRestPort)) {
+                System.out.println("Asterix successfully deployed and is now running.");
+            } else {
+                LOG.fatal("AsterixDB appears to have failed to install and start");
+                throw new YarnException("AsterixDB appears to have failed to install and start");
+            }
+        } catch (IOException e) {
+            LOG.fatal("Asterix failed to deploy on to cluster");
+            throw new YarnException(e);
+        }
+    }
+
+    public AsterixYARNClient(Configuration conf) throws Exception {
+
+        this.conf = conf;
+        yarnClient = YarnClient.createYarnClient();
+        //If the HDFS jars aren't on the classpath this won't be set 
+        if (conf.get("fs.hdfs.impl", null) == conf.get("fs.file.impl", null)) { //only would happen if both are null
+            conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+            conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+        }
+        yarnClient.init(conf);
+        opts = parseConf(conf);
+    }
+
+    private static Options parseConf(Configuration conf) {
+        Options opts = new Options();
+        opts.addOption(new Option("appname", true, "Application Name. Default value - Asterix"));
+        opts.addOption(new Option("priority", true, "Application Priority. Default 0"));
+        opts.addOption(new Option("queue", true, "RM Queue in which this application is to be submitted"));
+        opts.addOption(new Option("master_memory", true,
+                "Amount of memory in MB to be requested to run the application master"));
+        opts.addOption(new Option("log_properties", true, "log4j.properties file"));
+        opts.addOption(new Option("n", "name", true, "Asterix instance name (required)"));
+        opts.addOption(new Option("zip", "asterixZip", true,
+                "zip file with AsterixDB inside- if in non-default location"));
+        opts.addOption(new Option("bc", "baseConfig", true,
+                "base Asterix parameters configuration file if not in default position"));
+        opts.addOption(new Option("c", "asterixConf", true, "Asterix cluster config (required on install)"));
+        opts.addOption(new Option("l", "externalLibs", true, "Libraries to deploy along with Asterix instance"));
+        opts.addOption(new Option("ld", "libDataverse", true, "Dataverse to deploy external libraries to"));
+        opts.addOption(new Option("r", "refresh", false,
+                "If starting an existing instance, this will replace them with the local copy on startup"));
+        opts.addOption(new Option("appId", true, "ApplicationID to monitor if running client in status monitor mode"));
+        opts.addOption(new Option("masterLibsDir", true, "Directory that contains the JARs needed to run the AM"));
+        opts.addOption(new Option("s", "snapshot", true,
+                "Backup timestamp for arguments requiring a specific backup (rm, restore)"));
+        opts.addOption(new Option("v", "debug", false, "Dump out debug information"));
+        opts.addOption(new Option("help", false, "Print usage"));
+        opts.addOption(new Option("f", "force", false,
+                "Execute this command as fully as possible, disregarding any caution"));
+        return opts;
+    }
+
+    /**
+   */
+    public AsterixYARNClient() throws Exception {
+        this(new YarnConfiguration());
+    }
+
+    /**
+     * Helper function to print out usage
+     */
+    private void printUsage() {
+        new HelpFormatter().printHelp("Asterix YARN client. Usage: asterix [options] [mode]", opts);
+    }
+
+    /**
+     * Initialize the client's arguments and parameters before execution.
+     * 
+     * @param args
+     *            - Standard command-line arguments.
+     * @throws ParseException
+     */
+    public void init(String[] args) throws ParseException {
+
+        CommandLine cliParser = new GnuParser().parse(opts, args);
+        if (cliParser.hasOption("help")) {
+            printUsage();
+            return;
+        }
+        //initialize most things
+        debugFlag = cliParser.hasOption("debug");
+        force = cliParser.hasOption("force");
+        baseConfig = cliParser.getOptionValue("baseConfig");
+        extLibs = cliParser.getOptionValue("externalLibs");
+        libDataverse = cliParser.getOptionValue("libDataverse");
+
+        appName = cliParser.getOptionValue("appname", "AsterixDB");
+        amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+        amQueue = cliParser.getOptionValue("queue", "default");
+        amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
+
+        instanceName = cliParser.getOptionValue("name");
+        instanceFolder = instanceName + '/';
+        appName = appName + ": " + instanceName;
+
+        asterixConf = cliParser.getOptionValue("asterixConf");
+
+        log4jPropFile = cliParser.getOptionValue("log_properties", "");
+
+        //see if the given argument values are sane in general
+        checkConfSanity(args, cliParser);
+
+        //intialize the mode, see if it is a valid one.
+        initMode(args, cliParser);
+
+        //now check the validity of the arguments given the mode
+        checkModeSanity(args, cliParser);
+
+        //if we are going to refresh the binaries, find that out
+        refresh = cliParser.hasOption("refresh");
+        //same goes for snapshot restoration/removal
+        snapName = cliParser.getOptionValue("snapshot");
+
+        if (!cliParser.hasOption("asterixZip")
+                && (mode == Mode.INSTALL || mode == Mode.ALTER || mode == Mode.DESTROY || mode == Mode.BACKUP)) {
+
+            asterixZip = cliParser.getOptionValue("asterixZip", getAsterixDistributableLocation().getAbsolutePath());
+        } else {
+            asterixZip = cliParser.getOptionValue("asterixZip");
+        }
+
+    }
+
+    /**
+     * Cursory sanity checks for argument sanity, without considering the mode of the client
+     * 
+     * @param args
+     * @param cliParser
+     *            The parsed arguments.
+     * @throws ParseException
+     */
+    private void checkConfSanity(String[] args, CommandLine cliParser) throws ParseException {
+        String message = null;
+
+        //Sanity check for no args 
+        if (args.length == 0) {
+            message = "No args specified for client to initialize";
+        }
+        //AM memory should be a sane value
+        else if (amMemory < 0) {
+            message = "Invalid memory specified for application master, exiting." + " Specified memory=" + amMemory;
+        }
+        //we're good!
+        else {
+            return;
+        }
+        //default:
+        throw new ParseException(message);
+
+    }
+
+    /**
+     * Initialize the mode of the client from the arguments.
+     * 
+     * @param args
+     * @param cliParser
+     * @throws ParseException
+     */
+    private void initMode(String[] args, CommandLine cliParser) throws ParseException {
+        @SuppressWarnings("unchecked")
+        List<String> clientVerb = cliParser.getArgList();
+        String message = null;
+        //Now check if there is a mode
+        if (clientVerb == null || clientVerb.size() < 1) {
+            message = "You must specify an action.";
+        }
+        //But there can only be one mode...
+        else if (clientVerb.size() > 1) {
+            message = "Trailing arguments, or too many arguments. Only one action may be performed at a time.";
+        }
+        if (message != null) {
+            throw new ParseException(message);
+        }
+        //Now we can initialize the mode and check it against parameters
+        mode = Mode.fromAlias(clientVerb.get(0));
+        if (mode == null) {
+            mode = Mode.NOOP;
+        }
+    }
+
+    /**
+     * Determine if the command line arguments are sufficient for the requested client mode.
+     * 
+     * @param args
+     *            The command line arguments.
+     * @param cliParser
+     *            Parsed command line arguments.
+     * @throws ParseException
+     */
+
+    private void checkModeSanity(String[] args, CommandLine cliParser) throws ParseException {
+
+        String message = null;
+        //The only time you can use the client without specifiying an instance, is to list all of the instances it sees.
+        if (!cliParser.hasOption("name") && mode != Mode.DESCRIBE) {
+            message = "You must give a name for the instance to be acted upon";
+        } else if (mode == Mode.INSTALL && !cliParser.hasOption("asterixConf")) {
+            message = "No Configuration XML given. Please specify a config for cluster installation";
+        } else if (mode != Mode.START && cliParser.hasOption("refresh")) {
+            message = "Cannot specify refresh in any mode besides start, mode is: " + mode;
+        } else if (cliParser.hasOption("snapshot") && !(mode == Mode.RESTORE || mode == Mode.RMBACKUP)) {
+            message = "Cannot specify a snapshot to restore in any mode besides restore or rmbackup, mode is: " + mode;
+        } else if ((mode == Mode.ALTER || mode == Mode.INSTALL) && baseConfig == null
+                && !(new File(DEFAULT_PARAMETERS_PATH).exists())) {
+            message = "Default asterix parameters file is not in the default location, and no custom location is specified";
+        }
+        //nothing is wrong, so exit
+        else {
+            return;
+        }
+        //otherwise, something is bad.
+        throw new ParseException(message);
+
+    }
+
+    /**
+     * Find the distributable asterix bundle, be it in the default location or in a user-specified location.
+     * 
+     * @return
+     */
+    private File getAsterixDistributableLocation() {
+        //Look in the PWD for the "asterix" folder
+        File tarDir = new File("asterix");
+        if (!tarDir.exists()) {
+            throw new IllegalArgumentException(
+                    "Default directory structure not in use- please specify an asterix zip and base config file to distribute");
+        }
+        FileFilter tarFilter = new WildcardFileFilter("asterix-server*.zip");
+        File[] tarFiles = tarDir.listFiles(tarFilter);
+        if (tarFiles.length != 1) {
+            throw new IllegalArgumentException(
+                    "There is more than one canonically named asterix distributable in the default directory. Please leave only one there.");
+        }
+        return tarFiles[0];
+    }
+
+    /**
+     * Initialize and register the application attempt with the YARN ResourceManager.
+     * 
+     * @return
+     * @throws IOException
+     * @throws YarnException
+     */
+    public YarnClientApplication makeApplicationContext() throws IOException, YarnException {
+
+        //first check to see if an instance already exists.
+        FileSystem fs = FileSystem.get(conf);
+        Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+        LOG.info("Running Deployment");
+        yarnClient.start();
+        if (fs.exists(lock)) {
+            ApplicationId lockAppId = getLockFile();
+            try {
+                ApplicationReport previousAppReport = yarnClient.getApplicationReport(lockAppId);
+                YarnApplicationState prevStatus = previousAppReport.getYarnApplicationState();
+                if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED || prevStatus == YarnApplicationState.FINISHED)
+                        && mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+                    throw new IllegalStateException("Instance is already running in: " + lockAppId);
+                } else if (mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+                    //stale lock file
+                    LOG.warn("Stale lockfile detected. Instance attempt " + lockAppId + " may have exited abnormally");
+                    deleteLockFile();
+                }
+            } catch (YarnException e) {
+                LOG.warn("Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted.");
+                deleteLockFile();
+            }
+        }
+
+        // Get a new application id
+        YarnClientApplication app = yarnClient.createApplication();
+        GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+        int maxMem = appResponse.getMaximumResourceCapability().getMemory();
+        LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+        // A resource ask cannot exceed the max.
+        if (amMemory > maxMem) {
+            LOG.info("AM memory specified above max threshold of cluster. Using max value." + ", specified=" + amMemory
+                    + ", max=" + maxMem);
+            amMemory = maxMem;
+        }
+
+        // set the application name
+        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+        appContext.setApplicationName(appName);
+
+        return app;
+    }
+
+    /**
+     * Upload the Asterix cluster description on to the DFS. This will persist the state of the instance.
+     * 
+     * @return
+     * @throws YarnException
+     * @throws IOException
+     */
+    private List<DFSResourceCoordinate> deployConfig() throws YarnException, IOException {
+
+        FileSystem fs = FileSystem.get(conf);
+        List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+
+        String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+        Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+        FileStatus destStatus;
+        try {
+            destStatus = fs.getFileStatus(dstConf);
+        } catch (IOException e) {
+            throw new YarnException("Asterix instance by that name does not appear to exist in DFS");
+        }
+        LocalResource asterixConfLoc = Records.newRecord(LocalResource.class);
+        asterixConfLoc.setType(LocalResourceType.FILE);
+        asterixConfLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+        asterixConfLoc.setResource(ConverterUtils.getYarnUrlFromPath(dstConf));
+        asterixConfLoc.setTimestamp(destStatus.getModificationTime());
+
+        DFSResourceCoordinate conf = new DFSResourceCoordinate();
+        conf.envs.put(dstConf.toUri().toString(), AConstants.CONFLOCATION);
+        conf.envs.put(Long.toString(asterixConfLoc.getSize()), AConstants.CONFLEN);
+        conf.envs.put(Long.toString(asterixConfLoc.getTimestamp()), AConstants.CONFTIMESTAMP);
+        conf.name = CONFIG_DEFAULT_NAME;
+        conf.res = asterixConfLoc;
+        resources.add(conf);
+
+        return resources;
+
+    }
+
+    /**
+     * Install the current Asterix parameters to the DFS. This can be modified via alter.
+     * 
+     * @throws YarnException
+     * @throws IOException
+     */
+    private void installConfig() throws YarnException, IOException {
+        FileSystem fs = FileSystem.get(conf);
+        String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+        Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+        try {
+            fs.getFileStatus(dstConf);
+            if (mode == Mode.INSTALL) {
+                throw new IllegalStateException("Instance with this name already exists.");
+            }
+        } catch (FileNotFoundException e) {
+            if (mode == Mode.START) {
+                throw new IllegalStateException("Instance does not exist for this user", e);
+            }
+        }
+        if (mode == Mode.INSTALL) {
+            Path src = new Path(asterixConf);
+            fs.copyFromLocalFile(false, true, src, dstConf);
+        }
+
+    }
+
+    /**
+     * Upload External libraries and functions to HDFS for an instance to use when started
+     * @throws IllegalStateException
+     * @throws IOException
+     */
+
+    private void installExtLibs() throws IllegalStateException, IOException {
+        FileSystem fs = FileSystem.get(conf);
+        if (!instanceExists()) {
+            throw new IllegalStateException("No instance by name " + instanceName + " found.");
+        }
+        if (isRunning()) {
+            throw new IllegalStateException("Instance " + instanceName
+                    + " is running. Please stop it before installing any libraries.");
+        }
+        String libPathSuffix = CONF_DIR_REL + instanceFolder + "library" + Path.SEPARATOR + libDataverse
+                + Path.SEPARATOR;
+        Path src = new Path(extLibs);
+        String fullLibPath = libPathSuffix + src.getName();
+        Path libFilePath = new Path(fs.getHomeDirectory(), fullLibPath);
+        LOG.info("Copying Asterix external library to DFS");
+        fs.copyFromLocalFile(false, true, src, libFilePath);
+    }
+
+    /**
+     * Finds the minimal classes and JARs needed to start the AM only.
+     * @return Resources the AM needs to start on the initial container.
+     * @throws IllegalStateException
+     * @throws IOException
+     */
+    private List<DFSResourceCoordinate> installAmLibs() throws IllegalStateException, IOException {
+        List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+        FileSystem fs = FileSystem.get(conf);
+        String fullLibPath = CONF_DIR_REL + instanceFolder + "am_jars" + Path.SEPARATOR;
+        String[] cp = System.getProperty("java.class.path").split(System.getProperty("path.separator"));
+        String asterixJarPattern = "^(asterix).*(jar)$"; //starts with asterix,ends with jar
+        String commonsJarPattern = "^(commons).*(jar)$";
+        String surefireJarPattern = "^(surefire).*(jar)$"; //for maven tests
+        String jUnitTestPattern = "^(asterix-yarn" + File.separator + "target)$";
+
+        LOG.info(File.separator);
+        for (String j : cp) {
+            String[] pathComponents = j.split(Pattern.quote(File.separator));
+            LOG.info(j);
+            LOG.info(pathComponents[pathComponents.length - 1]);
+            if (pathComponents[pathComponents.length - 1].matches(asterixJarPattern)
+                    || pathComponents[pathComponents.length - 1].matches(commonsJarPattern)
+                    || pathComponents[pathComponents.length - 1].matches(surefireJarPattern)
+                    || pathComponents[pathComponents.length - 1].matches(jUnitTestPattern)) {
+                LOG.info("Loading JAR/classpath: " + j);
+                File f = new File(j);
+                Path dst = new Path(fs.getHomeDirectory(), fullLibPath + f.getName());
+                if (!fs.exists(dst) || refresh) {
+                    fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), dst);
+                }
+                FileStatus dstSt = fs.getFileStatus(dst);
+                LocalResource amLib = Records.newRecord(LocalResource.class);
+                amLib.setType(LocalResourceType.FILE);
+                amLib.setVisibility(LocalResourceVisibility.PRIVATE);
+                amLib.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+                amLib.setTimestamp(dstSt.getModificationTime());
+                amLib.setSize(dstSt.getLen());
+                DFSResourceCoordinate amLibCoord = new DFSResourceCoordinate();
+                amLibCoord.res = amLib;
+                amLibCoord.name = f.getName();
+                if (f.getName().contains("asterix-yarn") || f.getName().contains("surefire")) {
+                    amLibCoord.envs.put(dst.toUri().toString(), AConstants.APPLICATIONMASTERJARLOCATION);
+                    amLibCoord.envs.put(Long.toString(dstSt.getLen()), AConstants.APPLICATIONMASTERJARLEN);
+                    amLibCoord.envs.put(Long.toString(dstSt.getModificationTime()),
+                            AConstants.APPLICATIONMASTERJARTIMESTAMP);
+                }
+                resources.add(amLibCoord);
+            }
+
+        }
+        if (resources.size() == 0) {
+            throw new IOException("Required JARs are missing. Please check your directory structure");
+        }
+        return resources;
+    }
+
+    /**
+     * Uploads a AsterixDB cluster configuration to HDFS for the AM to use.
+     * @param overwrite Overwrite existing configurations by the same name.
+     * @throws IllegalStateException
+     * @throws IOException
+     */
+    private void installAsterixConfig(boolean overwrite) throws IllegalStateException, IOException {
+        FileSystem fs = FileSystem.get(conf);
+        File srcfile = new File(MERGED_PARAMETERS_PATH);
+        Path src = new Path(srcfile.getCanonicalPath());
+        String pathSuffix = CONF_DIR_REL + instanceFolder + File.separator + PARAMS_DEFAULT_NAME;
+        Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+        if (fs.exists(dst) && !overwrite) {
+
+            throw new IllegalStateException(
+                    "Instance exists. Please delete an existing instance before trying to overwrite");
+        }
+        fs.copyFromLocalFile(false, true, src, dst);
+    }
+
+    /**
+     * Uploads binary resources to HDFS for use by the AM
+     * @return
+     * @throws IOException
+     * @throws YarnException
+     */
+    public List<DFSResourceCoordinate> distributeBinaries() throws IOException, YarnException {
+
+        List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+        // Copy the application master jar to the filesystem
+        // Create a local resource to point to the destination jar path
+        FileSystem fs = FileSystem.get(conf);
+        Path src, dst;
+        FileStatus destStatus;
+        String pathSuffix;
+
+        // adding info so we can add the jar to the App master container path
+
+        // Add the asterix tarfile to HDFS for easy distribution
+        // Keep it all archived for now so add it as a file...
+
+        pathSuffix = CONF_DIR_REL + instanceFolder + "asterix-server.zip";
+        dst = new Path(fs.getHomeDirectory(), pathSuffix);
+        if (refresh) {
+            if (fs.exists(dst)) {
+                fs.delete(dst, false);
+            }
+        }
+        if (!fs.exists(dst)) {
+            src = new Path(asterixZip);
+            LOG.info("Copying Asterix distributable to DFS");
+            fs.copyFromLocalFile(false, true, src, dst);
+        }
+        destStatus = fs.getFileStatus(dst);
+        LocalResource asterixTarLoc = Records.newRecord(LocalResource.class);
+        asterixTarLoc.setType(LocalResourceType.ARCHIVE);
+        asterixTarLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+        asterixTarLoc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+        asterixTarLoc.setTimestamp(destStatus.getModificationTime());
+
+        // adding info so we can add the tarball to the App master container path
+        DFSResourceCoordinate tar = new DFSResourceCoordinate();
+        tar.envs.put(dst.toUri().toString(), AConstants.TARLOCATION);
+        tar.envs.put(Long.toString(asterixTarLoc.getSize()), AConstants.TARLEN);
+        tar.envs.put(Long.toString(asterixTarLoc.getTimestamp()), AConstants.TARTIMESTAMP);
+        tar.res = asterixTarLoc;
+        tar.name = "asterix-server.zip";
+        resources.add(tar);
+
+        // Set the log4j properties if needed
+        if (!log4jPropFile.isEmpty()) {
+            Path log4jSrc = new Path(log4jPropFile);
+            Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
+            fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
+            FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
+            LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
+            log4jRsrc.setType(LocalResourceType.FILE);
+            log4jRsrc.setVisibility(LocalResourceVisibility.PRIVATE);
+            log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
+            log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
+            log4jRsrc.setSize(log4jFileStatus.getLen());
+            DFSResourceCoordinate l4j = new DFSResourceCoordinate();
+            tar.res = log4jRsrc;
+            tar.name = "log4j.properties";
+            resources.add(l4j);
+        }
+
+        resources.addAll(installAmLibs());
+        return resources;
+    }
+
+    /**
+     * Submits the request to start the AsterixApplicationMaster to the YARN ResourceManager.
+     * 
+     * @param app
+     *            The application attempt handle.
+     * @param resources
+     *            Resources to be distributed as part of the container launch
+     * @param mode
+     *            The mode of the ApplicationMaster
+     * @return The application ID of the new Asterix instance.
+     * @throws IOException
+     * @throws YarnException
+     */
+
+    public ApplicationId deployAM(YarnClientApplication app, List<DFSResourceCoordinate> resources, Mode mode)
+            throws IOException, YarnException {
+
+        // Set up the container launch context for the application master
+        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+
+        // Set local resource info into app master container launch context
+        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+        for (DFSResourceCoordinate res : resources) {
+            localResources.put(res.name, res.res);
+        }
+        amContainer.setLocalResources(localResources);
+        // Set the env variables to be setup in the env where the application
+        // master will be run
+        LOG.info("Set the environment for the application master");
+        Map<String, String> env = new HashMap<String, String>();
+
+        // using the env info, the application master will create the correct
+        // local resource for the
+        // eventual containers that will be launched to execute the shell
+        // scripts
+        for (DFSResourceCoordinate res : resources) {
+            if (res.envs == null) { //some entries may not have environment variables.
+                continue;
+            }
+            for (Map.Entry<String, String> e : res.envs.entrySet()) {
+                env.put(e.getValue(), e.getKey());
+            }
+        }
+        //this is needed for when the RM address isn't known from the environment of the AM
+        env.put(AConstants.RMADDRESS, conf.get("yarn.resourcemanager.address"));
+        env.put(AConstants.RMSCHEDULERADDRESS, conf.get("yarn.resourcemanager.scheduler.address"));
+        ///add miscellaneous environment variables.
+        env.put(AConstants.INSTANCESTORE, CONF_DIR_REL + instanceFolder);
+        env.put(AConstants.DFS_BASE, FileSystem.get(conf).getHomeDirectory().toUri().toString());
+        env.put(AConstants.CC_JAVA_OPTS, ccJavaOpts);
+        env.put(AConstants.NC_JAVA_OPTS, ncJavaOpts);
+
+        // Add AppMaster.jar location to classpath
+        // At some point we should not be required to add
+        // the hadoop specific classpaths to the env.
+        // It should be provided out of the box.
+        // For now setting all required classpaths including
+        // the classpath to "." for the application jar
+        StringBuilder classPathEnv = new StringBuilder("").append("./*");
+        for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+            classPathEnv.append(File.pathSeparatorChar);
+            classPathEnv.append(c.trim());
+        }
+        classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
+
+        // add the runtime classpath needed for tests to work
+        if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+            LOG.info("In YARN MiniCluster");
+            classPathEnv.append(System.getProperty("path.separator"));
+            classPathEnv.append(System.getProperty("java.class.path"));
+            env.put("HADOOP_CONF_DIR", System.getProperty("user.dir") + File.separator + "target" + File.separator);
+        }
+        LOG.info("AM Classpath:" + classPathEnv.toString());
+        env.put("CLASSPATH", classPathEnv.toString());
+
+        amContainer.setEnvironment(env);
+
+        // Set the necessary command to execute the application master
+        Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+        // Set java executable command
+        LOG.info("Setting up app master command");
+        vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+        // Set class name
+        vargs.add(appMasterMainClass);
+        //Set params for Application Master
+        if (debugFlag) {
+            vargs.add("-debug");
+        }
+        if (mode == Mode.DESTROY) {
+            vargs.add("-obliterate");
+        }
+        else if (mode == Mode.BACKUP) {
+            vargs.add("-backup");
+        }
+        else if (mode == Mode.RESTORE) {
+            vargs.add("-restore " + snapName);
+        }
+        else if( mode == Mode.INSTALL){
+            vargs.add("-initial ");
+        }
+        if (refresh) {
+            vargs.add("-refresh");
+        }
+        //vargs.add("/bin/ls -alh asterix-server.zip/repo");
+        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "AppMaster.stdout");
+        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "AppMaster.stderr");
+        // Get final commmand
+        StringBuilder command = new StringBuilder();
+        for (CharSequence str : vargs) {
+            command.append(str).append(" ");
+        }
+
+        LOG.info("Completed setting up app master command " + command.toString());
+        List<String> commands = new ArrayList<String>();
+        commands.add(command.toString());
+        amContainer.setCommands(commands);
+
+        // Set up resource type requirements
+        // For now, only memory is supported so we set memory requirements
+        Resource capability = Records.newRecord(Resource.class);
+        capability.setMemory(amMemory);
+        appContext.setResource(capability);
+
+        // Service data is a binary blob that can be passed to the application
+        // Not needed in this scenario
+        // amContainer.setServiceData(serviceData);
+
+        // The following are not required for launching an application master
+        // amContainer.setContainerId(containerId);
+
+        appContext.setAMContainerSpec(amContainer);
+
+        // Set the priority for the application master
+        Priority pri = Records.newRecord(Priority.class);
+        // TODO - what is the range for priority? how to decide?
+        pri.setPriority(amPriority);
+        appContext.setPriority(pri);
+
+        // Set the queue to which this application is to be submitted in the RM
+        appContext.setQueue(amQueue);
+
+        // Submit the application to the applications manager
+        // SubmitApplicationResponse submitResp =
+        // applicationsManager.submitApplication(appRequest);
+        // Ignore the response as either a valid response object is returned on
+        // success
+        // or an exception thrown to denote some form of a failure
+        LOG.info("Submitting application to ASM");
+
+        yarnClient.submitApplication(appContext);
+
+        //now write the instance lock
+        if (mode == Mode.INSTALL || mode == Mode.START) {
+            FileSystem fs = FileSystem.get(conf);
+            Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+            if (fs.exists(lock)) {
+                throw new IllegalStateException("Somehow, this instance has been launched twice. ");
+            }
+            BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(lock, true)));
+            try {
+                out.write(app.getApplicationSubmissionContext().getApplicationId().toString());
+                out.close();
+            } finally {
+                out.close();
+            }
+        }
+        return app.getApplicationSubmissionContext().getApplicationId();
+
+    }
+
+    /**
+     * Asks YARN to kill a given application by appId
+     * @param appId The application to kill.
+     * @param yarnClient The YARN client object that is connected to the RM.
+     * @throws YarnException
+     * @throws IOException
+     */
+
+    public static void killApplication(ApplicationId appId, YarnClient yarnClient) throws YarnException, IOException {
+        if (appId == null) {
+            throw new YarnException("No Application given to kill");
+        }
+        if (yarnClient.isInState(STATE.INITED)) {
+            yarnClient.start();
+        }
+        YarnApplicationState st;
+        ApplicationReport rep = yarnClient.getApplicationReport(appId);
+        st = rep.getYarnApplicationState();
+        if (st == YarnApplicationState.FINISHED || st == YarnApplicationState.KILLED
+                || st == YarnApplicationState.FAILED) {
+            LOG.info("Application " + appId + " already exited.");
+            return;
+        }
+        LOG.info("Killing applicaiton with ID: " + appId);
+        yarnClient.killApplication(appId);
+
+    }
+
+    /**
+     * Tries to stop a running AsterixDB instance gracefully.
+     * @throws IOException
+     * @throws YarnException
+     */
+    private void stopInstanceIfRunning()
+            throws IOException, YarnException {
+        FileSystem fs = FileSystem.get(conf);
+        String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+        Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+        //if the instance is up, fix that
+        if (isRunning()) {
+            try {
+                this.stopInstance();
+            } catch (IOException e) {
+                throw new YarnException(e);
+            }
+        } else if (!fs.exists(dstConf)) {
+            throw new YarnException("No instance configured with that name exists");
+        }
+    }
+
+    /**
+     * Start a YARN job to delete local AsterixDB resources of an extant instance
+     * @param app The Client connection
+     * @param resources AM resources
+     * @throws IOException
+     * @throws YarnException
+     */
+
+    private void removeInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+            YarnException {
+        FileSystem fs = FileSystem.get(conf);
+        //if the instance is up, fix that
+        stopInstanceIfRunning();
+        //now try deleting all of the on-disk artifacts on the cluster
+        ApplicationId deleter = deployAM(app, resources, Mode.DESTROY);
+        boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start", ccRestPort);
+        if (!delete_start) {
+            if (force) {
+                fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
+                LOG.error("Forcing deletion of HDFS resources");
+            }
+            LOG.fatal(" of on-disk persistient resources on individual nodes failed.");
+            throw new YarnException();
+        }
+        boolean deleted = waitForCompletion(deleter, "Deletion in progress");
+        if (!(deleted || force)) {
+            LOG.fatal("Cleanup of on-disk persistent resources failed.");
+            return;
+        } else {
+            fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
+        }
+        System.out.println("Deletion of instance succeeded.");
+
+    }
+
+    /**
+     * Start a YARN job to copy all data-containing resources of an AsterixDB instance to HDFS
+     * @param app
+     * @param resources
+     * @throws IOException
+     * @throws YarnException
+     */
+
+    private void backupInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+            YarnException {
+        stopInstanceIfRunning();
+        ApplicationId backerUpper = deployAM(app, resources, Mode.BACKUP);
+        boolean backupStart;
+        backupStart = Utils.waitForApplication(backerUpper, yarnClient, "Waiting for backup " + backerUpper.toString()
+                + "to start", ccRestPort);
+        if (!backupStart) {
+            LOG.fatal("Backup failed to start");
+            throw new YarnException();
+        }
+        boolean complete;
+        complete = waitForCompletion(backerUpper, "Backup in progress");
+        if (!complete) {
+            LOG.fatal("Backup failed- timeout waiting for completion");
+            return;
+        }
+        System.out.println("Backup of instance succeeded.");
+    }
+
+    /**
+     * Start a YARN job to copy a set of resources from backupInstance to restore the state of an extant AsterixDB instance
+     * @param app
+     * @param resources
+     * @throws IOException
+     * @throws YarnException
+     */
+
+    private void restoreInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+            YarnException {
+        stopInstanceIfRunning();
+        ApplicationId restorer = deployAM(app, resources, Mode.RESTORE);
+        boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start", ccRestPort);
+        if (!restoreStart) {
+            LOG.fatal("Restore failed to start");
+            throw new YarnException();
+        }
+        boolean complete = waitForCompletion(restorer, "Restore in progress");
+        if (!complete) {
+            LOG.fatal("Restore failed- timeout waiting for completion");
+            return;
+        }
+        System.out.println("Restoration of instance succeeded.");
+    }
+
+    /**
+     * Stops the instance and remove the lockfile to allow a restart.
+     * 
+     * @throws IOException
+     * @throws JAXBException
+     * @throws YarnException
+     */
+
+    private void stopInstance() throws IOException, YarnException {
+        ApplicationId appId = getLockFile();
+        //get CC rest API port if it is nonstandard
+        readConfigParams(locateConfig());
+        if (yarnClient.isInState(STATE.INITED)) {
+            yarnClient.start();
+        }
+        System.out.println("Stopping instance " + instanceName);
+        if (!isRunning()) {
+            LOG.fatal("AsterixDB instance by that name is stopped already");
+            return;
+        }
+        try {
+            String ccIp = Utils.getCCHostname(instanceName, conf);
+            Utils.sendShutdownCall(ccIp,ccRestPort);
+        } catch (IOException e) {
+            LOG.error("Error while trying to issue safe shutdown:", e);
+        }
+        //now make sure it is actually gone and not "stuck"
+        String message = "Waiting for AsterixDB to shut down";
+        boolean completed = waitForCompletion(appId, message);
+        if (!completed && force) {
+            LOG.warn("Instance failed to stop gracefully, now killing it");
+            try {
+                AsterixYARNClient.killApplication(appId, yarnClient);
+                completed = true;
+            } catch (YarnException e1) {
+                LOG.fatal("Could not stop nor kill instance gracefully.",e1);
+                return;
+            }
+        }
+        if (completed) {
+            deleteLockFile();
+        }
+    }
+
+    private void deleteLockFile() throws IOException {
+        if (instanceName == null || instanceName == "") {
+            return;
+        }
+        FileSystem fs = FileSystem.get(conf);
+        Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceName + '/' + instanceLock);
+        if (fs.exists(lockPath)) {
+            fs.delete(lockPath, false);
+        }
+    }
+
+    private boolean instanceExists() throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+        Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+        return fs.exists(dstConf);
+    }
+
+    private boolean isRunning() throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+        Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+        if (fs.exists(dstConf)) {
+            Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+            return fs.exists(lock);
+        } else {
+            return false;
+        }
+    }
+
+    private ApplicationId getLockFile() throws IOException, YarnException {
+        if (instanceFolder == "") {
+            throw new IllegalStateException("Instance name not given.");
+        }
+        FileSystem fs = FileSystem.get(conf);
+        Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+        if (!fs.exists(lockPath)) {
+            throw new YarnException("Instance appears to not be running. If you know it is, try using kill");
+        }
+        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(lockPath)));
+        String lockAppId = br.readLine();
+        br.close();
+        return ConverterUtils.toApplicationId(lockAppId);
+    }
+
+    public static ApplicationId getLockFile(String instanceName, Configuration conf) throws IOException {
+        if (instanceName == "") {
+            throw new IllegalStateException("Instance name not given.");
+        }
+        FileSystem fs = FileSystem.get(conf);
+        Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceName + '/' + instanceLock);
+        if (!fs.exists(lockPath)) {
+            return null;
+        }
+        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(lockPath)));
+        String lockAppId = br.readLine();
+        br.close();
+        return ConverterUtils.toApplicationId(lockAppId);
+    }
+
+    /**
+     * Locate the Asterix parameters file.
+      * @return
+     * @throws FileNotFoundException
+     * @throws IOException
+     */
+    private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException{
+        AsterixConfiguration configuration;
+        String configPathBase = MERGED_PARAMETERS_PATH;
+        if (baseConfig != null) {
+            configuration = Utils.loadAsterixConfig(baseConfig);
+            configPathBase = new File(baseConfig).getParentFile().getAbsolutePath() + File.separator
+                    + PARAMS_DEFAULT_NAME;
+            MERGED_PARAMETERS_PATH = configPathBase;
+        } else {
+            configuration = Utils.loadAsterixConfig(DEFAULT_PARAMETERS_PATH);
+        }
+        return configuration;
+    }
+
+    /**
+     *
+     */
+    private void readConfigParams(AsterixConfiguration configuration){
+        //this is the "base" config that is inside the zip, we start here
+        for (edu.uci.ics.asterix.common.configuration.Property property : configuration.getProperty()) {
+            if (property.getName().equalsIgnoreCase(CC_JAVA_OPTS_KEY)) {
+                ccJavaOpts = property.getValue();
+            } else if (property.getName().equalsIgnoreCase(NC_JAVA_OPTS_KEY)) {
+                ncJavaOpts = property.getValue();
+            } else if(property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)){
+                ccRestPort = Integer.parseInt(property.getValue());
+            }
+
+        }
+    }
+
+    /**
+     * Retrieves necessary information from the cluster configuration and splices it into the Asterix configuration parameters
+     * @param cluster
+     * @throws FileNotFoundException
+     * @throws IOException
+     */
+
+    private void writeAsterixConfig(Cluster cluster) throws FileNotFoundException, IOException {
+        String metadataNodeId = Utils.getMetadataNode(cluster).getId();
+        String asterixInstanceName = instanceName;
+
+        AsterixConfiguration configuration = locateConfig();
+
+        readConfigParams(configuration);
+
+        String version = Utils.getAsterixVersionFromClasspath();
+        configuration.setVersion(version);
+
+        configuration.setInstanceName(asterixInstanceName);
+        String storeDir = null;
+        List<Store> stores = new ArrayList<Store>();
+        for (Node node : cluster.getNode()) {
+            storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
+            stores.add(new Store(node.getId(), storeDir));
+        }
+        configuration.setStore(stores);
+
+        List<Coredump> coredump = new ArrayList<Coredump>();
+        String coredumpDir = null;
+        List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
+        String txnLogDir = null;
+        for (Node node : cluster.getNode()) {
+            coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+            coredump.add(new Coredump(node.getId(), coredumpDir + "coredump" + File.separator));
+            txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir(); //node or cluster-wide
+            txnLogDirs.add(new TransactionLogDir(node.getId(), txnLogDir
+                    + (txnLogDir.charAt(txnLogDir.length() - 1) == File.separatorChar ? File.separator : "")
+                    + "txnLogs" //if the string doesn't have a trailing / add one
+                    + File.separator));
+        }
+        configuration.setMetadataNode(metadataNodeId);
+
+        configuration.setCoredump(coredump);
+        configuration.setTransactionLogDir(txnLogDirs);
+        FileOutputStream os = new FileOutputStream(MERGED_PARAMETERS_PATH);
+        try {
+            JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
+            Marshaller marshaller = ctx.createMarshaller();
+            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+
+            marshaller.marshal(configuration, os);
+        } catch (JAXBException e) {
+            throw new IOException(e);
+        } finally {
+            os.close();
+        }
+    }
+
+    private boolean waitForCompletion(ApplicationId appId, String message) throws YarnException, IOException {
+        return Utils.waitForApplication(appId, yarnClient, message, ccRestPort);
+    }
+
+    private class DFSResourceCoordinate {
+        String name;
+        LocalResource res;
+        Map<String, String> envs;
+
+        public DFSResourceCoordinate() {
+            envs = new HashMap<String, String>(3);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/69375a19/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java
----------------------------------------------------------------------
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java
new file mode 100644
index 0000000..55a9d0b
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+public class Deleter {
+    private static final Log LOG = LogFactory.getLog(Deleter.class);
+
+    public static void main(String[] args) throws IOException {
+
+	LogManager.getRootLogger().setLevel(Level.DEBUG); 
+	
+        LOG.info("Obliterator args: " + Arrays.toString(args));
+        for (int i = 0; i < args.length; i++) {
+            File f = new File(args[i]);
+            if (f.exists()) {
+                LOG.info("Deleting: " + f.getPath());
+                FileUtils.deleteDirectory(f);
+            } else {
+                LOG.error("Could not find file to delete: " + f.getPath());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/69375a19/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java
----------------------------------------------------------------------
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java
new file mode 100644
index 0000000..13c9a6e
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java
@@ -0,0 +1,94 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+public class HDFSBackup {
+    Configuration conf = new YarnConfiguration();
+    private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class);
+    boolean restore = false;
+    boolean backup = false;
+
+    public static void main(String[] args) throws ParseException, IllegalArgumentException, IOException {
+
+        HDFSBackup back = new HDFSBackup();
+        Map<String, String> envs = System.getenv();
+        if(envs.containsKey("HADOOP_CONF_DIR")){
+            File hadoopConfDir = new File(envs.get("HADOOP_CONF_DIR"));
+            if(hadoopConfDir.isDirectory()){
+               for(File config: hadoopConfDir.listFiles()){
+                   if(config.getName().matches("^.*(xml)$")){
+                       back.conf.addResource(new Path(config.getAbsolutePath()));
+                   }
+               }
+            }
+        }
+        Options opts = new Options();
+        opts.addOption("restore", false, "");
+        opts.addOption("backup", false, "");
+        CommandLine cliParser = new GnuParser().parse(opts, args);
+        if (cliParser.hasOption("restore")) {
+            back.restore = true;
+        }
+        if (cliParser.hasOption("backup")) {
+            back.backup = true;
+        }
+        @SuppressWarnings("unchecked")
+        List<String> pairs = (List<String>) cliParser.getArgList();
+
+        List<Path[]> sources = new ArrayList<Path[]>(10);
+        for (String p : pairs) {
+            String[] s = p.split(",");
+            sources.add(new Path[] { new Path(s[0]), new Path(s[1]) });
+        }
+
+        try {
+            if (back.backup) {
+                back.performBackup(sources);
+            }
+            if (back.restore) {
+                back.performRestore(sources);
+            }
+        } catch (IOException e) {
+            back.LOG.fatal("Backup/restoration unsuccessful: " + e.getMessage());
+            throw e;
+        }
+    }
+
+    private void performBackup(List<Path[]> paths) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        for (Path[] p : paths) {
+            LOG.info("Backing up " + p[0] + " to " + p[1] + ".");
+            fs.copyFromLocalFile(p[0], p[1]);
+        }
+    }
+
+    private void performRestore(List<Path[]> paths) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        for (Path[] p : paths) {
+            LOG.info("Restoring " + p[0] + " to " + p[1] + ".");
+            File f = new File(p[1].toString() + File.separator + p[0].getName());
+            LOG.info(f.getAbsolutePath());
+            if (f.exists()) {
+                FileUtils.deleteDirectory(f);
+            }
+            LOG.info(f.exists());
+            fs.copyToLocalFile(false, p[0], p[1], true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/69375a19/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java
----------------------------------------------------------------------
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java
new file mode 100644
index 0000000..4eb8bc3
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java
@@ -0,0 +1,462 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Scanner;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.commons.httpclient.*;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+
+public class Utils {
+
+    private Utils() {
+
+    }
+
+    private static final String CONF_DIR_REL = AsterixYARNClient.CONF_DIR_REL;
+
+    public static String hostFromContainerID(String containerID) {
+        return containerID.split("_")[4];
+    }
+
+    /**
+     * Gets the metadata node from an AsterixDB cluster description file
+     * 
+     * @param cluster
+     *            The cluster description in question.
+     * @return
+     */
+    public static Node getMetadataNode(Cluster cluster) {
+        Node metadataNode = null;
+        if (cluster.getMetadataNode() != null) {
+            for (Node node : cluster.getNode()) {
+                if (node.getId().equals(cluster.getMetadataNode())) {
+                    metadataNode = node;
+                    break;
+                }
+            }
+        } else {
+            //I will pick one for you.
+            metadataNode = cluster.getNode().get(1);
+        }
+        return metadataNode;
+    }
+
+    /**
+     * Sends a "poison pill" message to an AsterixDB instance for it to shut down safely.
+     * 
+     * @param host
+     *            The host to shut down.
+     * @throws IOException
+     */
+
+    public static void sendShutdownCall(String host, int port) throws IOException {
+        final String url = "http://" + host + ":" + port + "/admin/shutdown";
+        PostMethod method = new PostMethod(url);
+        try {
+            executeHTTPCall(method);
+        } catch (NoHttpResponseException e) {
+            //do nothing... this is expected
+        }
+        //now let's test that the instance is really down, or throw an exception
+        try {
+            executeHTTPCall(method);
+        } catch (ConnectException e) {
+            return;
+        }
+        throw new IOException("Instance did not shut down cleanly.");
+    }
+
+    /**
+     * Simple test via the AsterixDB Javascript API to determine if an instance is truly live or not.
+     * Queries the Metadata dataset and returns true if the query completes successfully, false otherwise.
+     * 
+     * @param host
+     *            The host to run the query against
+     * @return
+     *         True if the instance is OK, false otherwise.
+     * @throws IOException
+     */
+    public static boolean probeLiveness(String host, int port) throws IOException {
+        final String url = "http://" + host + ":" + port + "/query";
+        final String test = "for $x in dataset Metadata.Dataset return $x;";
+        GetMethod method = new GetMethod(url);
+        method.setQueryString(new NameValuePair[] { new NameValuePair("query", test) });
+        InputStream response;
+        try {
+            response = executeHTTPCall(method);
+        } catch (ConnectException e) {
+            return false;
+        }
+        if (response == null) {
+            return false;
+        }
+        BufferedReader br = new BufferedReader(new InputStreamReader(response));
+        String result = br.readLine();
+        if (result == null) {
+            return false;
+        }
+        if(method.getStatusCode() != HttpStatus.SC_OK){
+            return false;
+        }
+        return true;
+    }
+
+    private static InputStream executeHTTPCall(HttpMethod method) throws HttpException, IOException {
+        HttpClient client = new HttpClient();
+        HttpMethodRetryHandler noop = new HttpMethodRetryHandler() {
+            @Override
+            public boolean retryMethod(final HttpMethod method, final IOException exception, int executionCount) {
+                return false;
+            }
+        };
+        client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, noop);
+        client.executeMethod(method);
+        return method.getResponseBodyAsStream();
+    }
+
+    //**
+
+    public static String makeDots(int iter) {
+        int pos = iter % 3;
+        char[] dots = { ' ', ' ', ' ' };
+        dots[pos] = '.';
+        return new String(dots);
+    }
+
+    public static boolean confirmAction(String warning) {
+        System.out.println(warning);
+        System.out.print("Are you sure you want to do this? (yes/no): ");
+        Scanner in = new Scanner(System.in);
+        while (true) {
+            try {
+                String input = in.nextLine();
+                if ("yes".equals(input)) {
+                    return true;
+                } else if ("no".equals(input)) {
+                    return false;
+                } else {
+                    System.out.println("Please type yes or no");
+                }
+            } finally {
+                in.close();
+            }
+        }
+    }
+
+    /**
+     * Lists the deployed instances of AsterixDB on a YARN cluster
+     * 
+     * @param conf
+     *            Hadoop configuration object
+     * @param confDirRel
+     *            Relative AsterixDB configuration path for DFS
+     * @throws IOException
+     */
+
+    public static void listInstances(Configuration conf, String confDirRel) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        Path instanceFolder = new Path(fs.getHomeDirectory(), confDirRel);
+        if (!fs.exists(instanceFolder)) {
+            System.out.println("No running or stopped AsterixDB instances exist in this cluster.");
+            return;
+        }
+        FileStatus[] instances = fs.listStatus(instanceFolder);
+        if (instances.length != 0) {
+            System.out.println("Existing AsterixDB instances: ");
+            for (int i = 0; i < instances.length; i++) {
+                FileStatus st = instances[i];
+                String name = st.getPath().getName();
+                ApplicationId lockFile = AsterixYARNClient.getLockFile(name, conf);
+                if (lockFile != null) {
+                    System.out.println("Instance " + name + " is running with Application ID: " + lockFile.toString());
+                } else {
+                    System.out.println("Instance " + name + " is stopped");
+                }
+            }
+        } else {
+            System.out.println("No running or stopped AsterixDB instances exist in this cluster");
+        }
+    }
+
+    /**
+     * Lists the backups in the DFS.
+     * 
+     * @param conf
+     *            YARN configuration
+     * @param confDirRel
+     *            Relative config path
+     * @param instance
+     *            Instance name
+     * @throws IOException
+     */
+    public static void listBackups(Configuration conf, String confDirRel, String instance) throws IOException {
+        List<String> backups = getBackups(conf,confDirRel,instance);
+        if (backups.size() != 0) {
+            System.out.println("Backups for instance " + instance + ": ");
+            for (String name : backups) {
+                System.out.println("Backup: " + name);
+            }
+        } else {
+            System.out.println("No backups found for instance " + instance + ".");
+        }
+    }
+   /**
+    * Return the available snapshot names 
+    * @param conf 
+    * @param confDirRel
+    * @param instance
+    * @return
+    * @throws IOException
+    */
+    public static List<String> getBackups(Configuration conf, String confDirRel, String instance) throws IOException{
+        FileSystem fs = FileSystem.get(conf);
+        Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups");
+        FileStatus[] backups = fs.listStatus(backupFolder);
+        List<String> backupNames = new ArrayList<String>();
+        for(FileStatus f: backups){
+            backupNames.add(f.getPath().getName());
+        }
+        return backupNames;
+    }
+
+    /**
+     * Removes backup snapshots from the DFS
+     * 
+     * @param conf
+     *            DFS Configuration
+     * @param confDirRel
+     *            Configuration relative directory
+     * @param instance
+     *            The asterix instance name
+     * @param timestamp
+     *            The snapshot timestap (ID)
+     * @throws IOException
+     */
+    public static void rmBackup(Configuration conf, String confDirRel, String instance, long timestamp)
+            throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups");
+        FileStatus[] backups = fs.listStatus(backupFolder);
+        if (backups.length != 0) {
+            System.out.println("Backups for instance " + instance + ": ");
+        } else {
+            System.out.println("No backups found for instance " + instance + ".");
+        }
+        for (FileStatus f : backups) {
+            String name = f.getPath().getName();
+            long file_ts = Long.parseLong(name);
+            if (file_ts == timestamp) {
+                System.out.println("Deleting backup " + timestamp);
+                if (!fs.delete(f.getPath(), true)) {
+                    System.out.println("Backup could not be deleted");
+                    return;
+                } else {
+                    return;
+                }
+            }
+        }
+        System.out.println("No backup found with specified timestamp");
+
+    }
+
+    /**
+     * Simply parses out the YARN cluster config and instantiates it into a nice object.
+     * 
+     * @return The object representing the configuration
+     * @throws FileNotFoundException
+     * @throws JAXBException
+     */
+    public static Cluster parseYarnClusterConfig(String path) throws YarnException {
+        try {
+            File f = new File(path);
+            JAXBContext configCtx = JAXBContext.newInstance(Cluster.class);
+            Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+            Cluster cl = (Cluster) unmarshaller.unmarshal(f);
+            return cl;
+        } catch (JAXBException e) {
+            throw new YarnException(e);
+        }
+    }
+
+    public static void writeYarnClusterConfig(String path, Cluster cl) throws YarnException {
+        try {
+            File f = new File(path);
+            JAXBContext configCtx = JAXBContext.newInstance(Cluster.class);
+            Marshaller marhsaller = configCtx.createMarshaller();
+            marhsaller.marshal(cl, f);
+        } catch (JAXBException e) {
+            throw new YarnException(e);
+        }
+    }
+
+    /**
+     * Looks in the current class path for AsterixDB libraries and gets the version number from the name of the first match.
+     * 
+     * @return The version found, as a string.
+     */
+
+    public static String getAsterixVersionFromClasspath() {
+        String[] cp = System.getProperty("java.class.path").split(System.getProperty("path.separator"));
+        String asterixJarPattern = "^(asterix).*(jar)$"; //starts with asterix,ends with jar
+
+        for (String j : cp) {
+            //escape backslashes for windows
+            String[] pathComponents = j.split(Pattern.quote(File.separator));
+            if (pathComponents[pathComponents.length - 1].matches(asterixJarPattern)) {
+                //get components of maven version
+                String[] byDash = pathComponents[pathComponents.length - 1].split("-");
+                //get the version number but remove the possible '.jar' tailing it
+                String version = (byDash[2].split("\\."))[0];
+                //SNAPSHOT suffix
+                if (byDash.length == 4) {
+                    //do the same if it's a snapshot suffix
+                    return version + '-' + (byDash[3].split("\\."))[0];
+                }
+                //stable version
+                return version;
+            }
+        }
+        return null;
+    }
+
+    public static boolean waitForLiveness(ApplicationId appId, boolean probe, boolean print, String message,
+            YarnClient yarnClient, String instanceName, Configuration conf, int port) throws YarnException {
+        ApplicationReport report;
+        try {
+            report = yarnClient.getApplicationReport(appId);
+        } catch (IOException e) {
+            throw new YarnException(e);
+        }
+        YarnApplicationState st = report.getYarnApplicationState();
+        for (int i = 0; i < 120; i++) {
+            if (st != YarnApplicationState.RUNNING) {
+                try {
+                    report = yarnClient.getApplicationReport(appId);
+                    st = report.getYarnApplicationState();
+                    if (print) {
+                        System.out.print(message + Utils.makeDots(i) + "\r");
+                    }
+                    Thread.sleep(1000);
+                } catch (InterruptedException e1) {
+                    Thread.currentThread().interrupt();
+                } catch (IOException e1) {
+                    throw new YarnException(e1);
+                }
+                if (st == YarnApplicationState.FAILED || st == YarnApplicationState.FINISHED
+                        || st == YarnApplicationState.KILLED) {
+                    return false;
+                }
+            }
+            if (probe) {
+                String host;
+                host = getCCHostname(instanceName, conf);
+                try {
+                    for (int j = 0; j < 60; j++) {
+                        if (!Utils.probeLiveness(host, port)) {
+                            try {
+                                if (print) {
+                                    System.out.print(message + Utils.makeDots(i) + "\r");
+                                }
+                                Thread.sleep(1000);
+                            } catch (InterruptedException e2) {
+                                Thread.currentThread().interrupt();
+                            }
+                        } else {
+                            if (print) {
+                                System.out.println("");
+                            }
+                            return true;
+                        }
+                    }
+                } catch (IOException e1) {
+                    throw new YarnException(e1);
+                }
+            } else {
+                if (print) {
+                    System.out.println("");
+                }
+                return true;
+            }
+        }
+        if (print) {
+            System.out.println("");
+        }
+        return false;
+    }
+
+    public static boolean waitForLiveness(ApplicationId appId, String message, YarnClient yarnClient,
+            String instanceName, Configuration conf, int port) throws YarnException, IOException {
+        return waitForLiveness(appId, true, true, message, yarnClient, instanceName, conf, port);
+    }
+
+    public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, String message, int port)
+            throws YarnException, IOException {
+        return waitForLiveness(appId, false, true, message, yarnClient, "", null, port);
+    }
+
+    public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, int port) throws YarnException,
+            IOException, JAXBException {
+        return waitForLiveness(appId, false, false, "", yarnClient, "", null, port);
+    }
+
+    public static String getCCHostname(String instanceName, Configuration conf) throws YarnException {
+        try {
+            FileSystem fs = FileSystem.get(conf);
+            String instanceFolder = instanceName + "/";
+            String pathSuffix = CONF_DIR_REL + instanceFolder + "cluster-config.xml";
+            Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+            File tmp = File.createTempFile("cluster-config", "xml");
+            tmp.deleteOnExit();
+            fs.copyToLocalFile(dstConf, new Path(tmp.getPath()));
+            JAXBContext clusterConf = JAXBContext.newInstance(Cluster.class);
+            Unmarshaller unm = clusterConf.createUnmarshaller();
+            Cluster cl = (Cluster) unm.unmarshal(tmp);
+            String ccIp = cl.getMasterNode().getClientIp();
+            return ccIp;
+        } catch (IOException | JAXBException e) {
+            throw new YarnException(e);
+        }
+    }
+
+    public static AsterixConfiguration loadAsterixConfig(String path) throws IOException {
+        File f = new File(path);
+        try {
+            JAXBContext configCtx = JAXBContext.newInstance(AsterixConfiguration.class);
+            Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+            AsterixConfiguration conf = (AsterixConfiguration) unmarshaller.unmarshal(f);
+            return conf;
+        } catch (JAXBException e) {
+            throw new IOException(e);
+        }
+    }
+
+}


Mime
View raw message