Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C08FC172D9 for ; Tue, 30 Jun 2015 06:50:51 +0000 (UTC) Received: (qmail 27768 invoked by uid 500); 30 Jun 2015 06:50:51 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 27734 invoked by uid 500); 30 Jun 2015 06:50:51 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 27725 invoked by uid 99); 30 Jun 2015 06:50:51 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jun 2015 06:50:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 158C3C0045 for ; Tue, 30 Jun 2015 06:50:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id y83lkoOciMFE for ; Tue, 30 Jun 2015 06:50:37 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id AEC3E45CC1 for ; Tue, 30 Jun 2015 06:50:36 +0000 (UTC) Received: (qmail 27326 invoked by uid 99); 30 Jun 2015 06:50:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jun 2015 06:50:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0BF94E35E9; Tue, 30 Jun 2015 06:50:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: imaxon@apache.org To: commits@asterixdb.incubator.apache.org Date: Tue, 30 Jun 2015 06:50:37 -0000 Message-Id: <2be5444e176344ab8daa87eb15c19906@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] incubator-asterixdb git commit: YARN integration for AsterixDB http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/69375a19/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java ---------------------------------------------------------------------- diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java new file mode 100644 index 0000000..63b85e6 --- /dev/null +++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java @@ -0,0 +1,1387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.uci.ics.asterix.aoya; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileFilter; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; +import java.util.regex.Pattern; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; + +import com.google.common.collect.ImmutableMap; + +import edu.uci.ics.asterix.common.configuration.AsterixConfiguration; +import edu.uci.ics.asterix.common.configuration.Coredump; +import edu.uci.ics.asterix.common.configuration.Store; +import edu.uci.ics.asterix.common.configuration.TransactionLogDir; +import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster; +import edu.uci.ics.asterix.event.schema.yarnCluster.Node; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class AsterixYARNClient { + + public static enum Mode { + INSTALL("install"), + START("start"), + STOP("stop"), + KILL("kill"), + DESTROY("destroy"), + ALTER("alter"), + LIBINSTALL("libinstall"), + DESCRIBE("describe"), + BACKUP("backup"), + LSBACKUP("lsbackups"), + RMBACKUP("rmbackup"), + RESTORE("restore"), + NOOP(""); + + public final String alias; + + Mode(String alias) { + this.alias = alias; + } + + public static Mode fromAlias(String a) { + return STRING_TO_MODE.get(a.toLowerCase()); + } + } + + public static final Map STRING_TO_MODE = ImmutableMap + . builder().put(Mode.INSTALL.alias, Mode.INSTALL) + .put(Mode.START.alias, Mode.START).put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL) + .put(Mode.DESTROY.alias, Mode.DESTROY).put(Mode.ALTER.alias, Mode.ALTER) + .put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL).put(Mode.DESCRIBE.alias, Mode.DESCRIBE) + .put(Mode.BACKUP.alias, Mode.BACKUP).put(Mode.LSBACKUP.alias, Mode.LSBACKUP) + .put(Mode.RMBACKUP.alias, Mode.RMBACKUP).put(Mode.RESTORE.alias, Mode.RESTORE).build(); + private static final Log LOG = LogFactory.getLog(AsterixYARNClient.class); + public static final String CONF_DIR_REL = ".asterix" + File.separator; + private static final String instanceLock = "instance"; + public static final String CONFIG_DEFAULT_NAME = "cluster-config.xml"; + public static final String PARAMS_DEFAULT_NAME = "asterix-configuration.xml"; + private static String DEFAULT_PARAMETERS_PATH = "conf" + File.separator + "base-asterix-configuration.xml"; + private static String MERGED_PARAMETERS_PATH = "conf" + File.separator + PARAMS_DEFAULT_NAME; + private static final String JAVA_HOME = System.getProperty("java.home"); + public static final String NC_JAVA_OPTS_KEY = "nc.java.opts"; + public static final String CC_JAVA_OPTS_KEY = "cc.java.opts"; + public static final String CC_REST_PORT_KEY = "api.port"; + private Mode mode = Mode.NOOP; + + // Hadoop Configuration + private Configuration conf; + private YarnClient yarnClient; + // Application master specific info to register a new Application with + // RM/ASM + private String appName = ""; + // App master priority + private int amPriority = 0; + // Queue for App master + private String amQueue = ""; + // Amt. of memory resource to request for to run the App Master + private int amMemory = 1000; + + // Main class to invoke application master + private final String appMasterMainClass = "edu.uci.ics.asterix.aoya.AsterixApplicationMaster"; + + //instance name + private String instanceName = ""; + //location of distributable AsterixDB zip + private String asterixZip = ""; + // Location of cluster configuration + private String asterixConf = ""; + // Location of optional external libraries + private String extLibs = ""; + + private String instanceFolder = ""; + + // log4j.properties file + // if available, add to local resources and set into classpath + private String log4jPropFile = ""; + + // Debug flag + boolean debugFlag = false; + private boolean refresh = false; + private boolean force = false; + + // Command line options + private Options opts; + private String libDataverse; + private String snapName = ""; + private String baseConfig = "."; + private String ccJavaOpts = ""; + private String ncJavaOpts = ""; + + //Ports + private int ccRestPort = 19002; + + /** + * @param args + * Command line arguments + */ + public static void main(String[] args) { + + try { + AsterixYARNClient client = new AsterixYARNClient(); + try { + client.init(args); + AsterixYARNClient.execute(client); + } catch (ParseException | ApplicationNotFoundException e) { + LOG.fatal(e); + client.printUsage(); + System.exit(-1); + } + } catch (Exception e) { + LOG.fatal("Error running client", e); + System.exit(1); + } + LOG.info("Command executed successfully."); + System.exit(0); + } + + public static void execute(AsterixYARNClient client) throws IOException, YarnException { + YarnClientApplication app; + List res; + + System.out.println("JAVA HOME: " + JAVA_HOME); + switch (client.mode) { + case START: + startAction(client); + break; + case STOP: + try { + client.stopInstance(); + } catch (ApplicationNotFoundException e) { + LOG.info(e); + System.out.println("Asterix instance by that name already exited or was never started"); + client.deleteLockFile(); + } + break; + case KILL: + if (client.isRunning() && + Utils.confirmAction("Are you sure you want to kill this instance? In-progress tasks will be aborted")) { + try { + AsterixYARNClient.killApplication(client.getLockFile(), client.yarnClient); + } catch (ApplicationNotFoundException e) { + LOG.info(e); + System.out.println("Asterix instance by that name already exited or was never started"); + client.deleteLockFile(); + } + } + else if(!client.isRunning()){ + System.out.println("Asterix instance by that name already exited or was never started"); + client.deleteLockFile(); + } + break; + case DESCRIBE: + Utils.listInstances(client.conf, CONF_DIR_REL); + break; + case INSTALL: + installAction(client); + break; + case LIBINSTALL: + client.installExtLibs(); + break; + case ALTER: + client.writeAsterixConfig(Utils.parseYarnClusterConfig(client.asterixConf)); + client.installAsterixConfig(true); + System.out.println("Configuration successfully modified"); + break; + case DESTROY: + try { + if (client.force + || Utils.confirmAction("Are you really sure you want to obliterate this instance? This action cannot be undone!")) { + app = client.makeApplicationContext(); + res = client.deployConfig(); + res.addAll(client.distributeBinaries()); + client.removeInstance(app, res); + } + } catch (YarnException | IOException e) { + LOG.error("Asterix failed to deploy on to cluster"); + throw e; + } + break; + case BACKUP: + if (client.force || Utils.confirmAction("Performing a backup will stop a running instance.")) { + app = client.makeApplicationContext(); + res = client.deployConfig(); + res.addAll(client.distributeBinaries()); + client.backupInstance(app, res); + } + break; + case LSBACKUP: + Utils.listBackups(client.conf, CONF_DIR_REL, client.instanceName); + break; + case RMBACKUP: + Utils.rmBackup(client.conf, CONF_DIR_REL, client.instanceName, Long.parseLong(client.snapName)); + break; + case RESTORE: + if (client.force || Utils.confirmAction("Performing a restore will stop a running instance.")) { + app = client.makeApplicationContext(); + res = client.deployConfig(); + res.addAll(client.distributeBinaries()); + client.restoreInstance(app, res); + } + break; + default: + LOG.fatal("Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup"); + client.printUsage(); + System.exit(-1); + } + } + + private static void startAction(AsterixYARNClient client) throws YarnException { + YarnClientApplication app; + List res; + ApplicationId appId; + try { + app = client.makeApplicationContext(); + res = client.deployConfig(); + res.addAll(client.distributeBinaries()); + appId = client.deployAM(app, res, client.mode); + LOG.info("Asterix started up with Application ID: " + appId.toString()); + if (Utils.waitForLiveness(appId, "Waiting for AsterixDB instance to resume ", client.yarnClient, + client.instanceName, client.conf, client.ccRestPort)) { + System.out.println("Asterix successfully deployed and is now running."); + } else { + LOG.fatal("AsterixDB appears to have failed to install and start"); + throw new YarnException("AsterixDB appears to have failed to install and start"); + } + } catch (IOException e) { + throw new YarnException(e); + } + } + + private static void installAction(AsterixYARNClient client) throws YarnException { + YarnClientApplication app; + List res; + ApplicationId appId; + try { + app = client.makeApplicationContext(); + client.installConfig(); + client.writeAsterixConfig(Utils.parseYarnClusterConfig(client.asterixConf)); + client.installAsterixConfig(false); + res = client.deployConfig(); + res.addAll(client.distributeBinaries()); + + appId = client.deployAM(app, res, client.mode); + LOG.info("Asterix started up with Application ID: " + appId.toString()); + if (Utils.waitForLiveness(appId, "Waiting for new AsterixDB Instance to start ", client.yarnClient, + client.instanceName, client.conf, client.ccRestPort)) { + System.out.println("Asterix successfully deployed and is now running."); + } else { + LOG.fatal("AsterixDB appears to have failed to install and start"); + throw new YarnException("AsterixDB appears to have failed to install and start"); + } + } catch (IOException e) { + LOG.fatal("Asterix failed to deploy on to cluster"); + throw new YarnException(e); + } + } + + public AsterixYARNClient(Configuration conf) throws Exception { + + this.conf = conf; + yarnClient = YarnClient.createYarnClient(); + //If the HDFS jars aren't on the classpath this won't be set + if (conf.get("fs.hdfs.impl", null) == conf.get("fs.file.impl", null)) { //only would happen if both are null + conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); + conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); + } + yarnClient.init(conf); + opts = parseConf(conf); + } + + private static Options parseConf(Configuration conf) { + Options opts = new Options(); + opts.addOption(new Option("appname", true, "Application Name. Default value - Asterix")); + opts.addOption(new Option("priority", true, "Application Priority. Default 0")); + opts.addOption(new Option("queue", true, "RM Queue in which this application is to be submitted")); + opts.addOption(new Option("master_memory", true, + "Amount of memory in MB to be requested to run the application master")); + opts.addOption(new Option("log_properties", true, "log4j.properties file")); + opts.addOption(new Option("n", "name", true, "Asterix instance name (required)")); + opts.addOption(new Option("zip", "asterixZip", true, + "zip file with AsterixDB inside- if in non-default location")); + opts.addOption(new Option("bc", "baseConfig", true, + "base Asterix parameters configuration file if not in default position")); + opts.addOption(new Option("c", "asterixConf", true, "Asterix cluster config (required on install)")); + opts.addOption(new Option("l", "externalLibs", true, "Libraries to deploy along with Asterix instance")); + opts.addOption(new Option("ld", "libDataverse", true, "Dataverse to deploy external libraries to")); + opts.addOption(new Option("r", "refresh", false, + "If starting an existing instance, this will replace them with the local copy on startup")); + opts.addOption(new Option("appId", true, "ApplicationID to monitor if running client in status monitor mode")); + opts.addOption(new Option("masterLibsDir", true, "Directory that contains the JARs needed to run the AM")); + opts.addOption(new Option("s", "snapshot", true, + "Backup timestamp for arguments requiring a specific backup (rm, restore)")); + opts.addOption(new Option("v", "debug", false, "Dump out debug information")); + opts.addOption(new Option("help", false, "Print usage")); + opts.addOption(new Option("f", "force", false, + "Execute this command as fully as possible, disregarding any caution")); + return opts; + } + + /** + */ + public AsterixYARNClient() throws Exception { + this(new YarnConfiguration()); + } + + /** + * Helper function to print out usage + */ + private void printUsage() { + new HelpFormatter().printHelp("Asterix YARN client. Usage: asterix [options] [mode]", opts); + } + + /** + * Initialize the client's arguments and parameters before execution. + * + * @param args + * - Standard command-line arguments. + * @throws ParseException + */ + public void init(String[] args) throws ParseException { + + CommandLine cliParser = new GnuParser().parse(opts, args); + if (cliParser.hasOption("help")) { + printUsage(); + return; + } + //initialize most things + debugFlag = cliParser.hasOption("debug"); + force = cliParser.hasOption("force"); + baseConfig = cliParser.getOptionValue("baseConfig"); + extLibs = cliParser.getOptionValue("externalLibs"); + libDataverse = cliParser.getOptionValue("libDataverse"); + + appName = cliParser.getOptionValue("appname", "AsterixDB"); + amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); + amQueue = cliParser.getOptionValue("queue", "default"); + amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); + + instanceName = cliParser.getOptionValue("name"); + instanceFolder = instanceName + '/'; + appName = appName + ": " + instanceName; + + asterixConf = cliParser.getOptionValue("asterixConf"); + + log4jPropFile = cliParser.getOptionValue("log_properties", ""); + + //see if the given argument values are sane in general + checkConfSanity(args, cliParser); + + //intialize the mode, see if it is a valid one. + initMode(args, cliParser); + + //now check the validity of the arguments given the mode + checkModeSanity(args, cliParser); + + //if we are going to refresh the binaries, find that out + refresh = cliParser.hasOption("refresh"); + //same goes for snapshot restoration/removal + snapName = cliParser.getOptionValue("snapshot"); + + if (!cliParser.hasOption("asterixZip") + && (mode == Mode.INSTALL || mode == Mode.ALTER || mode == Mode.DESTROY || mode == Mode.BACKUP)) { + + asterixZip = cliParser.getOptionValue("asterixZip", getAsterixDistributableLocation().getAbsolutePath()); + } else { + asterixZip = cliParser.getOptionValue("asterixZip"); + } + + } + + /** + * Cursory sanity checks for argument sanity, without considering the mode of the client + * + * @param args + * @param cliParser + * The parsed arguments. + * @throws ParseException + */ + private void checkConfSanity(String[] args, CommandLine cliParser) throws ParseException { + String message = null; + + //Sanity check for no args + if (args.length == 0) { + message = "No args specified for client to initialize"; + } + //AM memory should be a sane value + else if (amMemory < 0) { + message = "Invalid memory specified for application master, exiting." + " Specified memory=" + amMemory; + } + //we're good! + else { + return; + } + //default: + throw new ParseException(message); + + } + + /** + * Initialize the mode of the client from the arguments. + * + * @param args + * @param cliParser + * @throws ParseException + */ + private void initMode(String[] args, CommandLine cliParser) throws ParseException { + @SuppressWarnings("unchecked") + List clientVerb = cliParser.getArgList(); + String message = null; + //Now check if there is a mode + if (clientVerb == null || clientVerb.size() < 1) { + message = "You must specify an action."; + } + //But there can only be one mode... + else if (clientVerb.size() > 1) { + message = "Trailing arguments, or too many arguments. Only one action may be performed at a time."; + } + if (message != null) { + throw new ParseException(message); + } + //Now we can initialize the mode and check it against parameters + mode = Mode.fromAlias(clientVerb.get(0)); + if (mode == null) { + mode = Mode.NOOP; + } + } + + /** + * Determine if the command line arguments are sufficient for the requested client mode. + * + * @param args + * The command line arguments. + * @param cliParser + * Parsed command line arguments. + * @throws ParseException + */ + + private void checkModeSanity(String[] args, CommandLine cliParser) throws ParseException { + + String message = null; + //The only time you can use the client without specifiying an instance, is to list all of the instances it sees. + if (!cliParser.hasOption("name") && mode != Mode.DESCRIBE) { + message = "You must give a name for the instance to be acted upon"; + } else if (mode == Mode.INSTALL && !cliParser.hasOption("asterixConf")) { + message = "No Configuration XML given. Please specify a config for cluster installation"; + } else if (mode != Mode.START && cliParser.hasOption("refresh")) { + message = "Cannot specify refresh in any mode besides start, mode is: " + mode; + } else if (cliParser.hasOption("snapshot") && !(mode == Mode.RESTORE || mode == Mode.RMBACKUP)) { + message = "Cannot specify a snapshot to restore in any mode besides restore or rmbackup, mode is: " + mode; + } else if ((mode == Mode.ALTER || mode == Mode.INSTALL) && baseConfig == null + && !(new File(DEFAULT_PARAMETERS_PATH).exists())) { + message = "Default asterix parameters file is not in the default location, and no custom location is specified"; + } + //nothing is wrong, so exit + else { + return; + } + //otherwise, something is bad. + throw new ParseException(message); + + } + + /** + * Find the distributable asterix bundle, be it in the default location or in a user-specified location. + * + * @return + */ + private File getAsterixDistributableLocation() { + //Look in the PWD for the "asterix" folder + File tarDir = new File("asterix"); + if (!tarDir.exists()) { + throw new IllegalArgumentException( + "Default directory structure not in use- please specify an asterix zip and base config file to distribute"); + } + FileFilter tarFilter = new WildcardFileFilter("asterix-server*.zip"); + File[] tarFiles = tarDir.listFiles(tarFilter); + if (tarFiles.length != 1) { + throw new IllegalArgumentException( + "There is more than one canonically named asterix distributable in the default directory. Please leave only one there."); + } + return tarFiles[0]; + } + + /** + * Initialize and register the application attempt with the YARN ResourceManager. + * + * @return + * @throws IOException + * @throws YarnException + */ + public YarnClientApplication makeApplicationContext() throws IOException, YarnException { + + //first check to see if an instance already exists. + FileSystem fs = FileSystem.get(conf); + Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock); + LOG.info("Running Deployment"); + yarnClient.start(); + if (fs.exists(lock)) { + ApplicationId lockAppId = getLockFile(); + try { + ApplicationReport previousAppReport = yarnClient.getApplicationReport(lockAppId); + YarnApplicationState prevStatus = previousAppReport.getYarnApplicationState(); + if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED || prevStatus == YarnApplicationState.FINISHED) + && mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) { + throw new IllegalStateException("Instance is already running in: " + lockAppId); + } else if (mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) { + //stale lock file + LOG.warn("Stale lockfile detected. Instance attempt " + lockAppId + " may have exited abnormally"); + deleteLockFile(); + } + } catch (YarnException e) { + LOG.warn("Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted."); + deleteLockFile(); + } + } + + // Get a new application id + YarnClientApplication app = yarnClient.createApplication(); + GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); + int maxMem = appResponse.getMaximumResourceCapability().getMemory(); + LOG.info("Max mem capabililty of resources in this cluster " + maxMem); + + // A resource ask cannot exceed the max. + if (amMemory > maxMem) { + LOG.info("AM memory specified above max threshold of cluster. Using max value." + ", specified=" + amMemory + + ", max=" + maxMem); + amMemory = maxMem; + } + + // set the application name + ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); + appContext.setApplicationName(appName); + + return app; + } + + /** + * Upload the Asterix cluster description on to the DFS. This will persist the state of the instance. + * + * @return + * @throws YarnException + * @throws IOException + */ + private List deployConfig() throws YarnException, IOException { + + FileSystem fs = FileSystem.get(conf); + List resources = new ArrayList(2); + + String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME; + Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix); + FileStatus destStatus; + try { + destStatus = fs.getFileStatus(dstConf); + } catch (IOException e) { + throw new YarnException("Asterix instance by that name does not appear to exist in DFS"); + } + LocalResource asterixConfLoc = Records.newRecord(LocalResource.class); + asterixConfLoc.setType(LocalResourceType.FILE); + asterixConfLoc.setVisibility(LocalResourceVisibility.PRIVATE); + asterixConfLoc.setResource(ConverterUtils.getYarnUrlFromPath(dstConf)); + asterixConfLoc.setTimestamp(destStatus.getModificationTime()); + + DFSResourceCoordinate conf = new DFSResourceCoordinate(); + conf.envs.put(dstConf.toUri().toString(), AConstants.CONFLOCATION); + conf.envs.put(Long.toString(asterixConfLoc.getSize()), AConstants.CONFLEN); + conf.envs.put(Long.toString(asterixConfLoc.getTimestamp()), AConstants.CONFTIMESTAMP); + conf.name = CONFIG_DEFAULT_NAME; + conf.res = asterixConfLoc; + resources.add(conf); + + return resources; + + } + + /** + * Install the current Asterix parameters to the DFS. This can be modified via alter. + * + * @throws YarnException + * @throws IOException + */ + private void installConfig() throws YarnException, IOException { + FileSystem fs = FileSystem.get(conf); + String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME; + Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix); + try { + fs.getFileStatus(dstConf); + if (mode == Mode.INSTALL) { + throw new IllegalStateException("Instance with this name already exists."); + } + } catch (FileNotFoundException e) { + if (mode == Mode.START) { + throw new IllegalStateException("Instance does not exist for this user", e); + } + } + if (mode == Mode.INSTALL) { + Path src = new Path(asterixConf); + fs.copyFromLocalFile(false, true, src, dstConf); + } + + } + + /** + * Upload External libraries and functions to HDFS for an instance to use when started + * @throws IllegalStateException + * @throws IOException + */ + + private void installExtLibs() throws IllegalStateException, IOException { + FileSystem fs = FileSystem.get(conf); + if (!instanceExists()) { + throw new IllegalStateException("No instance by name " + instanceName + " found."); + } + if (isRunning()) { + throw new IllegalStateException("Instance " + instanceName + + " is running. Please stop it before installing any libraries."); + } + String libPathSuffix = CONF_DIR_REL + instanceFolder + "library" + Path.SEPARATOR + libDataverse + + Path.SEPARATOR; + Path src = new Path(extLibs); + String fullLibPath = libPathSuffix + src.getName(); + Path libFilePath = new Path(fs.getHomeDirectory(), fullLibPath); + LOG.info("Copying Asterix external library to DFS"); + fs.copyFromLocalFile(false, true, src, libFilePath); + } + + /** + * Finds the minimal classes and JARs needed to start the AM only. + * @return Resources the AM needs to start on the initial container. + * @throws IllegalStateException + * @throws IOException + */ + private List installAmLibs() throws IllegalStateException, IOException { + List resources = new ArrayList(2); + FileSystem fs = FileSystem.get(conf); + String fullLibPath = CONF_DIR_REL + instanceFolder + "am_jars" + Path.SEPARATOR; + String[] cp = System.getProperty("java.class.path").split(System.getProperty("path.separator")); + String asterixJarPattern = "^(asterix).*(jar)$"; //starts with asterix,ends with jar + String commonsJarPattern = "^(commons).*(jar)$"; + String surefireJarPattern = "^(surefire).*(jar)$"; //for maven tests + String jUnitTestPattern = "^(asterix-yarn" + File.separator + "target)$"; + + LOG.info(File.separator); + for (String j : cp) { + String[] pathComponents = j.split(Pattern.quote(File.separator)); + LOG.info(j); + LOG.info(pathComponents[pathComponents.length - 1]); + if (pathComponents[pathComponents.length - 1].matches(asterixJarPattern) + || pathComponents[pathComponents.length - 1].matches(commonsJarPattern) + || pathComponents[pathComponents.length - 1].matches(surefireJarPattern) + || pathComponents[pathComponents.length - 1].matches(jUnitTestPattern)) { + LOG.info("Loading JAR/classpath: " + j); + File f = new File(j); + Path dst = new Path(fs.getHomeDirectory(), fullLibPath + f.getName()); + if (!fs.exists(dst) || refresh) { + fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), dst); + } + FileStatus dstSt = fs.getFileStatus(dst); + LocalResource amLib = Records.newRecord(LocalResource.class); + amLib.setType(LocalResourceType.FILE); + amLib.setVisibility(LocalResourceVisibility.PRIVATE); + amLib.setResource(ConverterUtils.getYarnUrlFromPath(dst)); + amLib.setTimestamp(dstSt.getModificationTime()); + amLib.setSize(dstSt.getLen()); + DFSResourceCoordinate amLibCoord = new DFSResourceCoordinate(); + amLibCoord.res = amLib; + amLibCoord.name = f.getName(); + if (f.getName().contains("asterix-yarn") || f.getName().contains("surefire")) { + amLibCoord.envs.put(dst.toUri().toString(), AConstants.APPLICATIONMASTERJARLOCATION); + amLibCoord.envs.put(Long.toString(dstSt.getLen()), AConstants.APPLICATIONMASTERJARLEN); + amLibCoord.envs.put(Long.toString(dstSt.getModificationTime()), + AConstants.APPLICATIONMASTERJARTIMESTAMP); + } + resources.add(amLibCoord); + } + + } + if (resources.size() == 0) { + throw new IOException("Required JARs are missing. Please check your directory structure"); + } + return resources; + } + + /** + * Uploads a AsterixDB cluster configuration to HDFS for the AM to use. + * @param overwrite Overwrite existing configurations by the same name. + * @throws IllegalStateException + * @throws IOException + */ + private void installAsterixConfig(boolean overwrite) throws IllegalStateException, IOException { + FileSystem fs = FileSystem.get(conf); + File srcfile = new File(MERGED_PARAMETERS_PATH); + Path src = new Path(srcfile.getCanonicalPath()); + String pathSuffix = CONF_DIR_REL + instanceFolder + File.separator + PARAMS_DEFAULT_NAME; + Path dst = new Path(fs.getHomeDirectory(), pathSuffix); + if (fs.exists(dst) && !overwrite) { + + throw new IllegalStateException( + "Instance exists. Please delete an existing instance before trying to overwrite"); + } + fs.copyFromLocalFile(false, true, src, dst); + } + + /** + * Uploads binary resources to HDFS for use by the AM + * @return + * @throws IOException + * @throws YarnException + */ + public List distributeBinaries() throws IOException, YarnException { + + List resources = new ArrayList(2); + // Copy the application master jar to the filesystem + // Create a local resource to point to the destination jar path + FileSystem fs = FileSystem.get(conf); + Path src, dst; + FileStatus destStatus; + String pathSuffix; + + // adding info so we can add the jar to the App master container path + + // Add the asterix tarfile to HDFS for easy distribution + // Keep it all archived for now so add it as a file... + + pathSuffix = CONF_DIR_REL + instanceFolder + "asterix-server.zip"; + dst = new Path(fs.getHomeDirectory(), pathSuffix); + if (refresh) { + if (fs.exists(dst)) { + fs.delete(dst, false); + } + } + if (!fs.exists(dst)) { + src = new Path(asterixZip); + LOG.info("Copying Asterix distributable to DFS"); + fs.copyFromLocalFile(false, true, src, dst); + } + destStatus = fs.getFileStatus(dst); + LocalResource asterixTarLoc = Records.newRecord(LocalResource.class); + asterixTarLoc.setType(LocalResourceType.ARCHIVE); + asterixTarLoc.setVisibility(LocalResourceVisibility.PRIVATE); + asterixTarLoc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); + asterixTarLoc.setTimestamp(destStatus.getModificationTime()); + + // adding info so we can add the tarball to the App master container path + DFSResourceCoordinate tar = new DFSResourceCoordinate(); + tar.envs.put(dst.toUri().toString(), AConstants.TARLOCATION); + tar.envs.put(Long.toString(asterixTarLoc.getSize()), AConstants.TARLEN); + tar.envs.put(Long.toString(asterixTarLoc.getTimestamp()), AConstants.TARTIMESTAMP); + tar.res = asterixTarLoc; + tar.name = "asterix-server.zip"; + resources.add(tar); + + // Set the log4j properties if needed + if (!log4jPropFile.isEmpty()) { + Path log4jSrc = new Path(log4jPropFile); + Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props"); + fs.copyFromLocalFile(false, true, log4jSrc, log4jDst); + FileStatus log4jFileStatus = fs.getFileStatus(log4jDst); + LocalResource log4jRsrc = Records.newRecord(LocalResource.class); + log4jRsrc.setType(LocalResourceType.FILE); + log4jRsrc.setVisibility(LocalResourceVisibility.PRIVATE); + log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri())); + log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime()); + log4jRsrc.setSize(log4jFileStatus.getLen()); + DFSResourceCoordinate l4j = new DFSResourceCoordinate(); + tar.res = log4jRsrc; + tar.name = "log4j.properties"; + resources.add(l4j); + } + + resources.addAll(installAmLibs()); + return resources; + } + + /** + * Submits the request to start the AsterixApplicationMaster to the YARN ResourceManager. + * + * @param app + * The application attempt handle. + * @param resources + * Resources to be distributed as part of the container launch + * @param mode + * The mode of the ApplicationMaster + * @return The application ID of the new Asterix instance. + * @throws IOException + * @throws YarnException + */ + + public ApplicationId deployAM(YarnClientApplication app, List resources, Mode mode) + throws IOException, YarnException { + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + + ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); + + // Set local resource info into app master container launch context + Map localResources = new HashMap(); + for (DFSResourceCoordinate res : resources) { + localResources.put(res.name, res.res); + } + amContainer.setLocalResources(localResources); + // Set the env variables to be setup in the env where the application + // master will be run + LOG.info("Set the environment for the application master"); + Map env = new HashMap(); + + // using the env info, the application master will create the correct + // local resource for the + // eventual containers that will be launched to execute the shell + // scripts + for (DFSResourceCoordinate res : resources) { + if (res.envs == null) { //some entries may not have environment variables. + continue; + } + for (Map.Entry e : res.envs.entrySet()) { + env.put(e.getValue(), e.getKey()); + } + } + //this is needed for when the RM address isn't known from the environment of the AM + env.put(AConstants.RMADDRESS, conf.get("yarn.resourcemanager.address")); + env.put(AConstants.RMSCHEDULERADDRESS, conf.get("yarn.resourcemanager.scheduler.address")); + ///add miscellaneous environment variables. + env.put(AConstants.INSTANCESTORE, CONF_DIR_REL + instanceFolder); + env.put(AConstants.DFS_BASE, FileSystem.get(conf).getHomeDirectory().toUri().toString()); + env.put(AConstants.CC_JAVA_OPTS, ccJavaOpts); + env.put(AConstants.NC_JAVA_OPTS, ncJavaOpts); + + // Add AppMaster.jar location to classpath + // At some point we should not be required to add + // the hadoop specific classpaths to the env. + // It should be provided out of the box. + // For now setting all required classpaths including + // the classpath to "." for the application jar + StringBuilder classPathEnv = new StringBuilder("").append("./*"); + for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + classPathEnv.append(File.pathSeparatorChar); + classPathEnv.append(c.trim()); + } + classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties"); + + // add the runtime classpath needed for tests to work + if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { + LOG.info("In YARN MiniCluster"); + classPathEnv.append(System.getProperty("path.separator")); + classPathEnv.append(System.getProperty("java.class.path")); + env.put("HADOOP_CONF_DIR", System.getProperty("user.dir") + File.separator + "target" + File.separator); + } + LOG.info("AM Classpath:" + classPathEnv.toString()); + env.put("CLASSPATH", classPathEnv.toString()); + + amContainer.setEnvironment(env); + + // Set the necessary command to execute the application master + Vector vargs = new Vector(30); + + // Set java executable command + LOG.info("Setting up app master command"); + vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java"); + // Set class name + vargs.add(appMasterMainClass); + //Set params for Application Master + if (debugFlag) { + vargs.add("-debug"); + } + if (mode == Mode.DESTROY) { + vargs.add("-obliterate"); + } + else if (mode == Mode.BACKUP) { + vargs.add("-backup"); + } + else if (mode == Mode.RESTORE) { + vargs.add("-restore " + snapName); + } + else if( mode == Mode.INSTALL){ + vargs.add("-initial "); + } + if (refresh) { + vargs.add("-refresh"); + } + //vargs.add("/bin/ls -alh asterix-server.zip/repo"); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "AppMaster.stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "AppMaster.stderr"); + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + LOG.info("Completed setting up app master command " + command.toString()); + List commands = new ArrayList(); + commands.add(command.toString()); + amContainer.setCommands(commands); + + // Set up resource type requirements + // For now, only memory is supported so we set memory requirements + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(amMemory); + appContext.setResource(capability); + + // Service data is a binary blob that can be passed to the application + // Not needed in this scenario + // amContainer.setServiceData(serviceData); + + // The following are not required for launching an application master + // amContainer.setContainerId(containerId); + + appContext.setAMContainerSpec(amContainer); + + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + // TODO - what is the range for priority? how to decide? + pri.setPriority(amPriority); + appContext.setPriority(pri); + + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue(amQueue); + + // Submit the application to the applications manager + // SubmitApplicationResponse submitResp = + // applicationsManager.submitApplication(appRequest); + // Ignore the response as either a valid response object is returned on + // success + // or an exception thrown to denote some form of a failure + LOG.info("Submitting application to ASM"); + + yarnClient.submitApplication(appContext); + + //now write the instance lock + if (mode == Mode.INSTALL || mode == Mode.START) { + FileSystem fs = FileSystem.get(conf); + Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock); + if (fs.exists(lock)) { + throw new IllegalStateException("Somehow, this instance has been launched twice. "); + } + BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(lock, true))); + try { + out.write(app.getApplicationSubmissionContext().getApplicationId().toString()); + out.close(); + } finally { + out.close(); + } + } + return app.getApplicationSubmissionContext().getApplicationId(); + + } + + /** + * Asks YARN to kill a given application by appId + * @param appId The application to kill. + * @param yarnClient The YARN client object that is connected to the RM. + * @throws YarnException + * @throws IOException + */ + + public static void killApplication(ApplicationId appId, YarnClient yarnClient) throws YarnException, IOException { + if (appId == null) { + throw new YarnException("No Application given to kill"); + } + if (yarnClient.isInState(STATE.INITED)) { + yarnClient.start(); + } + YarnApplicationState st; + ApplicationReport rep = yarnClient.getApplicationReport(appId); + st = rep.getYarnApplicationState(); + if (st == YarnApplicationState.FINISHED || st == YarnApplicationState.KILLED + || st == YarnApplicationState.FAILED) { + LOG.info("Application " + appId + " already exited."); + return; + } + LOG.info("Killing applicaiton with ID: " + appId); + yarnClient.killApplication(appId); + + } + + /** + * Tries to stop a running AsterixDB instance gracefully. + * @throws IOException + * @throws YarnException + */ + private void stopInstanceIfRunning() + throws IOException, YarnException { + FileSystem fs = FileSystem.get(conf); + String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME; + Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix); + //if the instance is up, fix that + if (isRunning()) { + try { + this.stopInstance(); + } catch (IOException e) { + throw new YarnException(e); + } + } else if (!fs.exists(dstConf)) { + throw new YarnException("No instance configured with that name exists"); + } + } + + /** + * Start a YARN job to delete local AsterixDB resources of an extant instance + * @param app The Client connection + * @param resources AM resources + * @throws IOException + * @throws YarnException + */ + + private void removeInstance(YarnClientApplication app, List resources) throws IOException, + YarnException { + FileSystem fs = FileSystem.get(conf); + //if the instance is up, fix that + stopInstanceIfRunning(); + //now try deleting all of the on-disk artifacts on the cluster + ApplicationId deleter = deployAM(app, resources, Mode.DESTROY); + boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start", ccRestPort); + if (!delete_start) { + if (force) { + fs.delete(new Path(CONF_DIR_REL + instanceFolder), true); + LOG.error("Forcing deletion of HDFS resources"); + } + LOG.fatal(" of on-disk persistient resources on individual nodes failed."); + throw new YarnException(); + } + boolean deleted = waitForCompletion(deleter, "Deletion in progress"); + if (!(deleted || force)) { + LOG.fatal("Cleanup of on-disk persistent resources failed."); + return; + } else { + fs.delete(new Path(CONF_DIR_REL + instanceFolder), true); + } + System.out.println("Deletion of instance succeeded."); + + } + + /** + * Start a YARN job to copy all data-containing resources of an AsterixDB instance to HDFS + * @param app + * @param resources + * @throws IOException + * @throws YarnException + */ + + private void backupInstance(YarnClientApplication app, List resources) throws IOException, + YarnException { + stopInstanceIfRunning(); + ApplicationId backerUpper = deployAM(app, resources, Mode.BACKUP); + boolean backupStart; + backupStart = Utils.waitForApplication(backerUpper, yarnClient, "Waiting for backup " + backerUpper.toString() + + "to start", ccRestPort); + if (!backupStart) { + LOG.fatal("Backup failed to start"); + throw new YarnException(); + } + boolean complete; + complete = waitForCompletion(backerUpper, "Backup in progress"); + if (!complete) { + LOG.fatal("Backup failed- timeout waiting for completion"); + return; + } + System.out.println("Backup of instance succeeded."); + } + + /** + * Start a YARN job to copy a set of resources from backupInstance to restore the state of an extant AsterixDB instance + * @param app + * @param resources + * @throws IOException + * @throws YarnException + */ + + private void restoreInstance(YarnClientApplication app, List resources) throws IOException, + YarnException { + stopInstanceIfRunning(); + ApplicationId restorer = deployAM(app, resources, Mode.RESTORE); + boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start", ccRestPort); + if (!restoreStart) { + LOG.fatal("Restore failed to start"); + throw new YarnException(); + } + boolean complete = waitForCompletion(restorer, "Restore in progress"); + if (!complete) { + LOG.fatal("Restore failed- timeout waiting for completion"); + return; + } + System.out.println("Restoration of instance succeeded."); + } + + /** + * Stops the instance and remove the lockfile to allow a restart. + * + * @throws IOException + * @throws JAXBException + * @throws YarnException + */ + + private void stopInstance() throws IOException, YarnException { + ApplicationId appId = getLockFile(); + //get CC rest API port if it is nonstandard + readConfigParams(locateConfig()); + if (yarnClient.isInState(STATE.INITED)) { + yarnClient.start(); + } + System.out.println("Stopping instance " + instanceName); + if (!isRunning()) { + LOG.fatal("AsterixDB instance by that name is stopped already"); + return; + } + try { + String ccIp = Utils.getCCHostname(instanceName, conf); + Utils.sendShutdownCall(ccIp,ccRestPort); + } catch (IOException e) { + LOG.error("Error while trying to issue safe shutdown:", e); + } + //now make sure it is actually gone and not "stuck" + String message = "Waiting for AsterixDB to shut down"; + boolean completed = waitForCompletion(appId, message); + if (!completed && force) { + LOG.warn("Instance failed to stop gracefully, now killing it"); + try { + AsterixYARNClient.killApplication(appId, yarnClient); + completed = true; + } catch (YarnException e1) { + LOG.fatal("Could not stop nor kill instance gracefully.",e1); + return; + } + } + if (completed) { + deleteLockFile(); + } + } + + private void deleteLockFile() throws IOException { + if (instanceName == null || instanceName == "") { + return; + } + FileSystem fs = FileSystem.get(conf); + Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceName + '/' + instanceLock); + if (fs.exists(lockPath)) { + fs.delete(lockPath, false); + } + } + + private boolean instanceExists() throws IOException { + FileSystem fs = FileSystem.get(conf); + String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME; + Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix); + return fs.exists(dstConf); + } + + private boolean isRunning() throws IOException { + FileSystem fs = FileSystem.get(conf); + String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME; + Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix); + if (fs.exists(dstConf)) { + Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock); + return fs.exists(lock); + } else { + return false; + } + } + + private ApplicationId getLockFile() throws IOException, YarnException { + if (instanceFolder == "") { + throw new IllegalStateException("Instance name not given."); + } + FileSystem fs = FileSystem.get(conf); + Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock); + if (!fs.exists(lockPath)) { + throw new YarnException("Instance appears to not be running. If you know it is, try using kill"); + } + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(lockPath))); + String lockAppId = br.readLine(); + br.close(); + return ConverterUtils.toApplicationId(lockAppId); + } + + public static ApplicationId getLockFile(String instanceName, Configuration conf) throws IOException { + if (instanceName == "") { + throw new IllegalStateException("Instance name not given."); + } + FileSystem fs = FileSystem.get(conf); + Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceName + '/' + instanceLock); + if (!fs.exists(lockPath)) { + return null; + } + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(lockPath))); + String lockAppId = br.readLine(); + br.close(); + return ConverterUtils.toApplicationId(lockAppId); + } + + /** + * Locate the Asterix parameters file. + * @return + * @throws FileNotFoundException + * @throws IOException + */ + private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException{ + AsterixConfiguration configuration; + String configPathBase = MERGED_PARAMETERS_PATH; + if (baseConfig != null) { + configuration = Utils.loadAsterixConfig(baseConfig); + configPathBase = new File(baseConfig).getParentFile().getAbsolutePath() + File.separator + + PARAMS_DEFAULT_NAME; + MERGED_PARAMETERS_PATH = configPathBase; + } else { + configuration = Utils.loadAsterixConfig(DEFAULT_PARAMETERS_PATH); + } + return configuration; + } + + /** + * + */ + private void readConfigParams(AsterixConfiguration configuration){ + //this is the "base" config that is inside the zip, we start here + for (edu.uci.ics.asterix.common.configuration.Property property : configuration.getProperty()) { + if (property.getName().equalsIgnoreCase(CC_JAVA_OPTS_KEY)) { + ccJavaOpts = property.getValue(); + } else if (property.getName().equalsIgnoreCase(NC_JAVA_OPTS_KEY)) { + ncJavaOpts = property.getValue(); + } else if(property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)){ + ccRestPort = Integer.parseInt(property.getValue()); + } + + } + } + + /** + * Retrieves necessary information from the cluster configuration and splices it into the Asterix configuration parameters + * @param cluster + * @throws FileNotFoundException + * @throws IOException + */ + + private void writeAsterixConfig(Cluster cluster) throws FileNotFoundException, IOException { + String metadataNodeId = Utils.getMetadataNode(cluster).getId(); + String asterixInstanceName = instanceName; + + AsterixConfiguration configuration = locateConfig(); + + readConfigParams(configuration); + + String version = Utils.getAsterixVersionFromClasspath(); + configuration.setVersion(version); + + configuration.setInstanceName(asterixInstanceName); + String storeDir = null; + List stores = new ArrayList(); + for (Node node : cluster.getNode()) { + storeDir = node.getStore() == null ? cluster.getStore() : node.getStore(); + stores.add(new Store(node.getId(), storeDir)); + } + configuration.setStore(stores); + + List coredump = new ArrayList(); + String coredumpDir = null; + List txnLogDirs = new ArrayList(); + String txnLogDir = null; + for (Node node : cluster.getNode()) { + coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir(); + coredump.add(new Coredump(node.getId(), coredumpDir + "coredump" + File.separator)); + txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir(); //node or cluster-wide + txnLogDirs.add(new TransactionLogDir(node.getId(), txnLogDir + + (txnLogDir.charAt(txnLogDir.length() - 1) == File.separatorChar ? File.separator : "") + + "txnLogs" //if the string doesn't have a trailing / add one + + File.separator)); + } + configuration.setMetadataNode(metadataNodeId); + + configuration.setCoredump(coredump); + configuration.setTransactionLogDir(txnLogDirs); + FileOutputStream os = new FileOutputStream(MERGED_PARAMETERS_PATH); + try { + JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class); + Marshaller marshaller = ctx.createMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); + + marshaller.marshal(configuration, os); + } catch (JAXBException e) { + throw new IOException(e); + } finally { + os.close(); + } + } + + private boolean waitForCompletion(ApplicationId appId, String message) throws YarnException, IOException { + return Utils.waitForApplication(appId, yarnClient, message, ccRestPort); + } + + private class DFSResourceCoordinate { + String name; + LocalResource res; + Map envs; + + public DFSResourceCoordinate() { + envs = new HashMap(3); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/69375a19/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java ---------------------------------------------------------------------- diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java new file mode 100644 index 0000000..55a9d0b --- /dev/null +++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java @@ -0,0 +1,31 @@ +package edu.uci.ics.asterix.aoya; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; + +public class Deleter { + private static final Log LOG = LogFactory.getLog(Deleter.class); + + public static void main(String[] args) throws IOException { + + LogManager.getRootLogger().setLevel(Level.DEBUG); + + LOG.info("Obliterator args: " + Arrays.toString(args)); + for (int i = 0; i < args.length; i++) { + File f = new File(args[i]); + if (f.exists()) { + LOG.info("Deleting: " + f.getPath()); + FileUtils.deleteDirectory(f); + } else { + LOG.error("Could not find file to delete: " + f.getPath()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/69375a19/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java ---------------------------------------------------------------------- diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java new file mode 100644 index 0000000..13c9a6e --- /dev/null +++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java @@ -0,0 +1,94 @@ +package edu.uci.ics.asterix.aoya; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +public class HDFSBackup { + Configuration conf = new YarnConfiguration(); + private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class); + boolean restore = false; + boolean backup = false; + + public static void main(String[] args) throws ParseException, IllegalArgumentException, IOException { + + HDFSBackup back = new HDFSBackup(); + Map envs = System.getenv(); + if(envs.containsKey("HADOOP_CONF_DIR")){ + File hadoopConfDir = new File(envs.get("HADOOP_CONF_DIR")); + if(hadoopConfDir.isDirectory()){ + for(File config: hadoopConfDir.listFiles()){ + if(config.getName().matches("^.*(xml)$")){ + back.conf.addResource(new Path(config.getAbsolutePath())); + } + } + } + } + Options opts = new Options(); + opts.addOption("restore", false, ""); + opts.addOption("backup", false, ""); + CommandLine cliParser = new GnuParser().parse(opts, args); + if (cliParser.hasOption("restore")) { + back.restore = true; + } + if (cliParser.hasOption("backup")) { + back.backup = true; + } + @SuppressWarnings("unchecked") + List pairs = (List) cliParser.getArgList(); + + List sources = new ArrayList(10); + for (String p : pairs) { + String[] s = p.split(","); + sources.add(new Path[] { new Path(s[0]), new Path(s[1]) }); + } + + try { + if (back.backup) { + back.performBackup(sources); + } + if (back.restore) { + back.performRestore(sources); + } + } catch (IOException e) { + back.LOG.fatal("Backup/restoration unsuccessful: " + e.getMessage()); + throw e; + } + } + + private void performBackup(List paths) throws IOException { + FileSystem fs = FileSystem.get(conf); + for (Path[] p : paths) { + LOG.info("Backing up " + p[0] + " to " + p[1] + "."); + fs.copyFromLocalFile(p[0], p[1]); + } + } + + private void performRestore(List paths) throws IOException { + FileSystem fs = FileSystem.get(conf); + for (Path[] p : paths) { + LOG.info("Restoring " + p[0] + " to " + p[1] + "."); + File f = new File(p[1].toString() + File.separator + p[0].getName()); + LOG.info(f.getAbsolutePath()); + if (f.exists()) { + FileUtils.deleteDirectory(f); + } + LOG.info(f.exists()); + fs.copyToLocalFile(false, p[0], p[1], true); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/69375a19/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java ---------------------------------------------------------------------- diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java new file mode 100644 index 0000000..4eb8bc3 --- /dev/null +++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java @@ -0,0 +1,462 @@ +package edu.uci.ics.asterix.aoya; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.ConnectException; +import java.util.ArrayList; +import java.util.List; +import java.util.Scanner; +import java.util.regex.Pattern; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; + +import org.apache.commons.httpclient.*; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.params.HttpMethodParams; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import edu.uci.ics.asterix.common.configuration.AsterixConfiguration; +import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster; +import edu.uci.ics.asterix.event.schema.yarnCluster.Node; + +public class Utils { + + private Utils() { + + } + + private static final String CONF_DIR_REL = AsterixYARNClient.CONF_DIR_REL; + + public static String hostFromContainerID(String containerID) { + return containerID.split("_")[4]; + } + + /** + * Gets the metadata node from an AsterixDB cluster description file + * + * @param cluster + * The cluster description in question. + * @return + */ + public static Node getMetadataNode(Cluster cluster) { + Node metadataNode = null; + if (cluster.getMetadataNode() != null) { + for (Node node : cluster.getNode()) { + if (node.getId().equals(cluster.getMetadataNode())) { + metadataNode = node; + break; + } + } + } else { + //I will pick one for you. + metadataNode = cluster.getNode().get(1); + } + return metadataNode; + } + + /** + * Sends a "poison pill" message to an AsterixDB instance for it to shut down safely. + * + * @param host + * The host to shut down. + * @throws IOException + */ + + public static void sendShutdownCall(String host, int port) throws IOException { + final String url = "http://" + host + ":" + port + "/admin/shutdown"; + PostMethod method = new PostMethod(url); + try { + executeHTTPCall(method); + } catch (NoHttpResponseException e) { + //do nothing... this is expected + } + //now let's test that the instance is really down, or throw an exception + try { + executeHTTPCall(method); + } catch (ConnectException e) { + return; + } + throw new IOException("Instance did not shut down cleanly."); + } + + /** + * Simple test via the AsterixDB Javascript API to determine if an instance is truly live or not. + * Queries the Metadata dataset and returns true if the query completes successfully, false otherwise. + * + * @param host + * The host to run the query against + * @return + * True if the instance is OK, false otherwise. + * @throws IOException + */ + public static boolean probeLiveness(String host, int port) throws IOException { + final String url = "http://" + host + ":" + port + "/query"; + final String test = "for $x in dataset Metadata.Dataset return $x;"; + GetMethod method = new GetMethod(url); + method.setQueryString(new NameValuePair[] { new NameValuePair("query", test) }); + InputStream response; + try { + response = executeHTTPCall(method); + } catch (ConnectException e) { + return false; + } + if (response == null) { + return false; + } + BufferedReader br = new BufferedReader(new InputStreamReader(response)); + String result = br.readLine(); + if (result == null) { + return false; + } + if(method.getStatusCode() != HttpStatus.SC_OK){ + return false; + } + return true; + } + + private static InputStream executeHTTPCall(HttpMethod method) throws HttpException, IOException { + HttpClient client = new HttpClient(); + HttpMethodRetryHandler noop = new HttpMethodRetryHandler() { + @Override + public boolean retryMethod(final HttpMethod method, final IOException exception, int executionCount) { + return false; + } + }; + client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, noop); + client.executeMethod(method); + return method.getResponseBodyAsStream(); + } + + //** + + public static String makeDots(int iter) { + int pos = iter % 3; + char[] dots = { ' ', ' ', ' ' }; + dots[pos] = '.'; + return new String(dots); + } + + public static boolean confirmAction(String warning) { + System.out.println(warning); + System.out.print("Are you sure you want to do this? (yes/no): "); + Scanner in = new Scanner(System.in); + while (true) { + try { + String input = in.nextLine(); + if ("yes".equals(input)) { + return true; + } else if ("no".equals(input)) { + return false; + } else { + System.out.println("Please type yes or no"); + } + } finally { + in.close(); + } + } + } + + /** + * Lists the deployed instances of AsterixDB on a YARN cluster + * + * @param conf + * Hadoop configuration object + * @param confDirRel + * Relative AsterixDB configuration path for DFS + * @throws IOException + */ + + public static void listInstances(Configuration conf, String confDirRel) throws IOException { + FileSystem fs = FileSystem.get(conf); + Path instanceFolder = new Path(fs.getHomeDirectory(), confDirRel); + if (!fs.exists(instanceFolder)) { + System.out.println("No running or stopped AsterixDB instances exist in this cluster."); + return; + } + FileStatus[] instances = fs.listStatus(instanceFolder); + if (instances.length != 0) { + System.out.println("Existing AsterixDB instances: "); + for (int i = 0; i < instances.length; i++) { + FileStatus st = instances[i]; + String name = st.getPath().getName(); + ApplicationId lockFile = AsterixYARNClient.getLockFile(name, conf); + if (lockFile != null) { + System.out.println("Instance " + name + " is running with Application ID: " + lockFile.toString()); + } else { + System.out.println("Instance " + name + " is stopped"); + } + } + } else { + System.out.println("No running or stopped AsterixDB instances exist in this cluster"); + } + } + + /** + * Lists the backups in the DFS. + * + * @param conf + * YARN configuration + * @param confDirRel + * Relative config path + * @param instance + * Instance name + * @throws IOException + */ + public static void listBackups(Configuration conf, String confDirRel, String instance) throws IOException { + List backups = getBackups(conf,confDirRel,instance); + if (backups.size() != 0) { + System.out.println("Backups for instance " + instance + ": "); + for (String name : backups) { + System.out.println("Backup: " + name); + } + } else { + System.out.println("No backups found for instance " + instance + "."); + } + } + /** + * Return the available snapshot names + * @param conf + * @param confDirRel + * @param instance + * @return + * @throws IOException + */ + public static List getBackups(Configuration conf, String confDirRel, String instance) throws IOException{ + FileSystem fs = FileSystem.get(conf); + Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups"); + FileStatus[] backups = fs.listStatus(backupFolder); + List backupNames = new ArrayList(); + for(FileStatus f: backups){ + backupNames.add(f.getPath().getName()); + } + return backupNames; + } + + /** + * Removes backup snapshots from the DFS + * + * @param conf + * DFS Configuration + * @param confDirRel + * Configuration relative directory + * @param instance + * The asterix instance name + * @param timestamp + * The snapshot timestap (ID) + * @throws IOException + */ + public static void rmBackup(Configuration conf, String confDirRel, String instance, long timestamp) + throws IOException { + FileSystem fs = FileSystem.get(conf); + Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups"); + FileStatus[] backups = fs.listStatus(backupFolder); + if (backups.length != 0) { + System.out.println("Backups for instance " + instance + ": "); + } else { + System.out.println("No backups found for instance " + instance + "."); + } + for (FileStatus f : backups) { + String name = f.getPath().getName(); + long file_ts = Long.parseLong(name); + if (file_ts == timestamp) { + System.out.println("Deleting backup " + timestamp); + if (!fs.delete(f.getPath(), true)) { + System.out.println("Backup could not be deleted"); + return; + } else { + return; + } + } + } + System.out.println("No backup found with specified timestamp"); + + } + + /** + * Simply parses out the YARN cluster config and instantiates it into a nice object. + * + * @return The object representing the configuration + * @throws FileNotFoundException + * @throws JAXBException + */ + public static Cluster parseYarnClusterConfig(String path) throws YarnException { + try { + File f = new File(path); + JAXBContext configCtx = JAXBContext.newInstance(Cluster.class); + Unmarshaller unmarshaller = configCtx.createUnmarshaller(); + Cluster cl = (Cluster) unmarshaller.unmarshal(f); + return cl; + } catch (JAXBException e) { + throw new YarnException(e); + } + } + + public static void writeYarnClusterConfig(String path, Cluster cl) throws YarnException { + try { + File f = new File(path); + JAXBContext configCtx = JAXBContext.newInstance(Cluster.class); + Marshaller marhsaller = configCtx.createMarshaller(); + marhsaller.marshal(cl, f); + } catch (JAXBException e) { + throw new YarnException(e); + } + } + + /** + * Looks in the current class path for AsterixDB libraries and gets the version number from the name of the first match. + * + * @return The version found, as a string. + */ + + public static String getAsterixVersionFromClasspath() { + String[] cp = System.getProperty("java.class.path").split(System.getProperty("path.separator")); + String asterixJarPattern = "^(asterix).*(jar)$"; //starts with asterix,ends with jar + + for (String j : cp) { + //escape backslashes for windows + String[] pathComponents = j.split(Pattern.quote(File.separator)); + if (pathComponents[pathComponents.length - 1].matches(asterixJarPattern)) { + //get components of maven version + String[] byDash = pathComponents[pathComponents.length - 1].split("-"); + //get the version number but remove the possible '.jar' tailing it + String version = (byDash[2].split("\\."))[0]; + //SNAPSHOT suffix + if (byDash.length == 4) { + //do the same if it's a snapshot suffix + return version + '-' + (byDash[3].split("\\."))[0]; + } + //stable version + return version; + } + } + return null; + } + + public static boolean waitForLiveness(ApplicationId appId, boolean probe, boolean print, String message, + YarnClient yarnClient, String instanceName, Configuration conf, int port) throws YarnException { + ApplicationReport report; + try { + report = yarnClient.getApplicationReport(appId); + } catch (IOException e) { + throw new YarnException(e); + } + YarnApplicationState st = report.getYarnApplicationState(); + for (int i = 0; i < 120; i++) { + if (st != YarnApplicationState.RUNNING) { + try { + report = yarnClient.getApplicationReport(appId); + st = report.getYarnApplicationState(); + if (print) { + System.out.print(message + Utils.makeDots(i) + "\r"); + } + Thread.sleep(1000); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } catch (IOException e1) { + throw new YarnException(e1); + } + if (st == YarnApplicationState.FAILED || st == YarnApplicationState.FINISHED + || st == YarnApplicationState.KILLED) { + return false; + } + } + if (probe) { + String host; + host = getCCHostname(instanceName, conf); + try { + for (int j = 0; j < 60; j++) { + if (!Utils.probeLiveness(host, port)) { + try { + if (print) { + System.out.print(message + Utils.makeDots(i) + "\r"); + } + Thread.sleep(1000); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); + } + } else { + if (print) { + System.out.println(""); + } + return true; + } + } + } catch (IOException e1) { + throw new YarnException(e1); + } + } else { + if (print) { + System.out.println(""); + } + return true; + } + } + if (print) { + System.out.println(""); + } + return false; + } + + public static boolean waitForLiveness(ApplicationId appId, String message, YarnClient yarnClient, + String instanceName, Configuration conf, int port) throws YarnException, IOException { + return waitForLiveness(appId, true, true, message, yarnClient, instanceName, conf, port); + } + + public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, String message, int port) + throws YarnException, IOException { + return waitForLiveness(appId, false, true, message, yarnClient, "", null, port); + } + + public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, int port) throws YarnException, + IOException, JAXBException { + return waitForLiveness(appId, false, false, "", yarnClient, "", null, port); + } + + public static String getCCHostname(String instanceName, Configuration conf) throws YarnException { + try { + FileSystem fs = FileSystem.get(conf); + String instanceFolder = instanceName + "/"; + String pathSuffix = CONF_DIR_REL + instanceFolder + "cluster-config.xml"; + Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix); + File tmp = File.createTempFile("cluster-config", "xml"); + tmp.deleteOnExit(); + fs.copyToLocalFile(dstConf, new Path(tmp.getPath())); + JAXBContext clusterConf = JAXBContext.newInstance(Cluster.class); + Unmarshaller unm = clusterConf.createUnmarshaller(); + Cluster cl = (Cluster) unm.unmarshal(tmp); + String ccIp = cl.getMasterNode().getClientIp(); + return ccIp; + } catch (IOException | JAXBException e) { + throw new YarnException(e); + } + } + + public static AsterixConfiguration loadAsterixConfig(String path) throws IOException { + File f = new File(path); + try { + JAXBContext configCtx = JAXBContext.newInstance(AsterixConfiguration.class); + Unmarshaller unmarshaller = configCtx.createUnmarshaller(); + AsterixConfiguration conf = (AsterixConfiguration) unmarshaller.unmarshal(f); + return conf; + } catch (JAXBException e) { + throw new IOException(e); + } + } + +}