apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From david...@apache.org
Subject [3/5] incubator-apex-core git commit: APEXCORE-340 Naming change of apex script. Changed following (dt->apex): 1. Script name changed to apex. 2. Class name changed to ApexCli from DTCli. Similar for test class. 3. Changed console prompt to "apex>" from
Date Fri, 15 Apr 2016 20:07:36 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0b124a5f/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
new file mode 100644
index 0000000..87e4bdc
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -0,0 +1,3950 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.cli;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.security.PrivilegedExceptionAction;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import javax.validation.constraints.NotNull;
+import javax.ws.rs.core.MediaType;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.log4j.Appender;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.tools.ant.DirectoryScanner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.sun.jersey.api.client.WebResource;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.stram.StramClient;
+import com.datatorrent.stram.client.AppPackage;
+import com.datatorrent.stram.client.AppPackage.AppInfo;
+import com.datatorrent.stram.client.ConfigPackage;
+import com.datatorrent.stram.client.DTConfiguration;
+import com.datatorrent.stram.client.DTConfiguration.Scope;
+import com.datatorrent.stram.client.RecordingsAgent;
+import com.datatorrent.stram.client.RecordingsAgent.RecordingInfo;
+import com.datatorrent.stram.client.StramAgent;
+import com.datatorrent.stram.client.StramAppLauncher;
+import com.datatorrent.stram.client.StramAppLauncher.AppFactory;
+import com.datatorrent.stram.client.StramClientUtils;
+import com.datatorrent.stram.client.StramClientUtils.ClientRMHelper;
+import com.datatorrent.stram.codec.LogicalPlanSerializer;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.requests.AddStreamSinkRequest;
+import com.datatorrent.stram.plan.logical.requests.CreateOperatorRequest;
+import com.datatorrent.stram.plan.logical.requests.CreateStreamRequest;
+import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
+import com.datatorrent.stram.plan.logical.requests.RemoveOperatorRequest;
+import com.datatorrent.stram.plan.logical.requests.RemoveStreamRequest;
+import com.datatorrent.stram.plan.logical.requests.SetOperatorAttributeRequest;
+import com.datatorrent.stram.plan.logical.requests.SetOperatorPropertyRequest;
+import com.datatorrent.stram.plan.logical.requests.SetPortAttributeRequest;
+import com.datatorrent.stram.plan.logical.requests.SetStreamAttributeRequest;
+import com.datatorrent.stram.security.StramUserLogin;
+import com.datatorrent.stram.util.JSONSerializationProvider;
+import com.datatorrent.stram.util.VersionInfo;
+import com.datatorrent.stram.util.WebServicesClient;
+import com.datatorrent.stram.webapp.OperatorDiscoverer;
+import com.datatorrent.stram.webapp.StramWebServices;
+import com.datatorrent.stram.webapp.TypeDiscoverer;
+
+import jline.console.ConsoleReader;
+import jline.console.completer.AggregateCompleter;
+import jline.console.completer.ArgumentCompleter;
+import jline.console.completer.Completer;
+import jline.console.completer.FileNameCompleter;
+import jline.console.completer.StringsCompleter;
+import jline.console.history.FileHistory;
+import jline.console.history.History;
+import jline.console.history.MemoryHistory;
+import net.lingala.zip4j.exception.ZipException;
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
+/**
+ * Provides command line interface for a streaming application on hadoop (yarn)
+ * <p>
+ *
+ * @since 0.3.2
+ */
+@SuppressWarnings("UseOfSystemOutOrSystemErr")
+public class ApexCli
+{
+  private static final Logger LOG = LoggerFactory.getLogger(ApexCli.class);
+  private Configuration conf;
+  private FileSystem fs;
+  private StramAgent stramAgent;
+  private final YarnClient yarnClient = YarnClient.createYarnClient();
+  private ApplicationReport currentApp = null;
+  private boolean consolePresent;
+  private String[] commandsToExecute;
+  private final Map<String, CommandSpec> globalCommands = new TreeMap<>();
+  private final Map<String, CommandSpec> connectedCommands = new TreeMap<>();
+  private final Map<String, CommandSpec> logicalPlanChangeCommands = new TreeMap<>();
+  private final Map<String, String> aliases = new HashMap<>();
+  private final Map<String, List<String>> macros = new HashMap<>();
+  private boolean changingLogicalPlan = false;
+  private final List<LogicalPlanRequest> logicalPlanRequestQueue = new ArrayList<>();
+  private FileHistory topLevelHistory;
+  private FileHistory changingLogicalPlanHistory;
+  private String jsonp;
+  private boolean raw = false;
+  private RecordingsAgent recordingsAgent;
+  private final ObjectMapper mapper = new JSONSerializationProvider().getContext(null);
+  private String pagerCommand;
+  private Process pagerProcess;
+  private int verboseLevel = 0;
+  private final Tokenizer tokenizer = new Tokenizer();
+  private final Map<String, String> variableMap = new HashMap<>();
+  private static boolean lastCommandError = false;
+  private Thread mainThread;
+  private Thread commandThread;
+  private String prompt;
+  private String forcePrompt;
+  private String kerberosPrincipal;
+  private String kerberosKeyTab;
+
+  private static class FileLineReader extends ConsoleReader
+  {
+    private final BufferedReader br;
+
+    FileLineReader(String fileName) throws IOException
+    {
+      super();
+      fileName = expandFileName(fileName, true);
+      br = new BufferedReader(new FileReader(fileName));
+    }
+
+    @Override
+    public String readLine(String prompt) throws IOException
+    {
+      return br.readLine();
+    }
+
+    @Override
+    public String readLine(String prompt, Character mask) throws IOException
+    {
+      return br.readLine();
+    }
+
+    @Override
+    public String readLine(Character mask) throws IOException
+    {
+      return br.readLine();
+    }
+
+    public void close() throws IOException
+    {
+      br.close();
+    }
+
+  }
+
+  public class Tokenizer
+  {
+    private void appendToCommandBuffer(List<String> commandBuffer, StringBuffer buf, boolean potentialEmptyArg)
+    {
+      if (potentialEmptyArg || buf.length() > 0) {
+        commandBuffer.add(buf.toString());
+        buf.setLength(0);
+      }
+    }
+
+    private List<String> startNewCommand(LinkedList<List<String>> resultBuffer)
+    {
+      List<String> newCommand = new ArrayList<>();
+      if (!resultBuffer.isEmpty()) {
+        List<String> lastCommand = resultBuffer.peekLast();
+        if (lastCommand.size() == 1) {
+          String first = lastCommand.get(0);
+          if (first.matches("^[A-Za-z][A-Za-z0-9]*=.*")) {
+            // This is a variable assignment
+            int equalSign = first.indexOf('=');
+            variableMap.put(first.substring(0, equalSign), first.substring(equalSign + 1));
+            resultBuffer.removeLast();
+          }
+        }
+      }
+      resultBuffer.add(newCommand);
+      return newCommand;
+    }
+
+    public List<String[]> tokenize(String commandLine)
+    {
+      LinkedList<List<String>> resultBuffer = new LinkedList<>();
+      List<String> commandBuffer = startNewCommand(resultBuffer);
+
+      if (commandLine != null) {
+        commandLine = ltrim(commandLine);
+        if (commandLine.startsWith("#")) {
+          return null;
+        }
+
+        int len = commandLine.length();
+        boolean insideQuotes = false;
+        boolean potentialEmptyArg = false;
+        StringBuffer buf = new StringBuffer(commandLine.length());
+
+        for (@SuppressWarnings("AssignmentToForLoopParameter") int i = 0; i < len; ++i) {
+          char c = commandLine.charAt(i);
+          if (c == '"') {
+            potentialEmptyArg = true;
+            insideQuotes = !insideQuotes;
+          } else if (c == '\\') {
+            if (len > i + 1) {
+              switch (commandLine.charAt(i + 1)) {
+                case 'n':
+                  buf.append("\n");
+                  break;
+                case 't':
+                  buf.append("\t");
+                  break;
+                case 'r':
+                  buf.append("\r");
+                  break;
+                case 'b':
+                  buf.append("\b");
+                  break;
+                case 'f':
+                  buf.append("\f");
+                  break;
+                default:
+                  buf.append(commandLine.charAt(i + 1));
+              }
+              ++i;
+            }
+          } else {
+            if (insideQuotes) {
+              buf.append(c);
+            } else {
+
+              if (c == '$') {
+                StringBuilder variableName = new StringBuilder(32);
+                if (len > i + 1) {
+                  if (commandLine.charAt(i + 1) == '{') {
+                    ++i;
+                    while (len > i + 1) {
+                      char ch = commandLine.charAt(i + 1);
+                      if (ch != '}') {
+                        variableName.append(ch);
+                      }
+                      ++i;
+                      if (ch == '}') {
+                        break;
+                      }
+                      if (len <= i + 1) {
+                        throw new CliException("Parse error: unmatched brace");
+                      }
+                    }
+                  } else if (commandLine.charAt(i + 1) == '?') {
+                    ++i;
+                    buf.append(lastCommandError ? "1" : "0");
+                    continue;
+                  } else {
+                    while (len > i + 1) {
+                      char ch = commandLine.charAt(i + 1);
+                      if ((variableName.length() > 0 && ch >= '0' && ch <= '9') || ((ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z'))) {
+                        variableName.append(ch);
+                      } else {
+                        break;
+                      }
+                      ++i;
+                    }
+                  }
+                  if (variableName.length() == 0) {
+                    buf.append(c);
+                  } else {
+                    String value = variableMap.get(variableName.toString());
+                    if (value != null) {
+                      buf.append(value);
+                    }
+                  }
+                } else {
+                  buf.append(c);
+                }
+              } else if (c == ';') {
+                appendToCommandBuffer(commandBuffer, buf, potentialEmptyArg);
+                commandBuffer = startNewCommand(resultBuffer);
+              } else if (Character.isWhitespace(c)) {
+                appendToCommandBuffer(commandBuffer, buf, potentialEmptyArg);
+                potentialEmptyArg = false;
+                if (len > i + 1 && commandLine.charAt(i + 1) == '#') {
+                  break;
+                }
+              } else {
+                buf.append(c);
+              }
+            }
+          }
+        }
+        appendToCommandBuffer(commandBuffer, buf, potentialEmptyArg);
+      }
+      startNewCommand(resultBuffer);
+      List<String[]> result = new ArrayList<>();
+      for (List<String> command : resultBuffer) {
+        String[] commandArray = new String[command.size()];
+        result.add(command.toArray(commandArray));
+      }
+      return result;
+    }
+
+  }
+
+  private interface Command
+  {
+    void execute(String[] args, ConsoleReader reader) throws Exception;
+
+  }
+
+  private static class Arg
+  {
+    final String name;
+
+    Arg(String name)
+    {
+      this.name = name;
+    }
+
+    @Override
+    public String toString()
+    {
+      return name;
+    }
+
+  }
+
+  private static class FileArg extends Arg
+  {
+    FileArg(String name)
+    {
+      super(name);
+    }
+
+  }
+
+  // VarArg must be in optional argument and must be at the end
+  private static class VarArg extends Arg
+  {
+    VarArg(String name)
+    {
+      super(name);
+    }
+  }
+
+  private static class CommandArg extends Arg
+  {
+    CommandArg(String name)
+    {
+      super(name);
+    }
+
+  }
+
+  protected PrintStream suppressOutput()
+  {
+    PrintStream originalStream = System.out;
+    if (raw) {
+      PrintStream dummyStream = new PrintStream(new OutputStream()
+      {
+        @Override
+        public void write(int b)
+        {
+          // no-op
+        }
+
+      });
+      System.setOut(dummyStream);
+    }
+    return originalStream;
+  }
+
+  protected void restoreOutput(PrintStream originalStream)
+  {
+    if (raw) {
+      System.setOut(originalStream);
+    }
+  }
+
+  AppPackage newAppPackageInstance(File f) throws IOException, ZipException
+  {
+    PrintStream outputStream = suppressOutput();
+    try {
+      return new AppPackage(f, true);
+    } finally {
+      restoreOutput(outputStream);
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private StramAppLauncher getStramAppLauncher(String jarfileUri, Configuration config, boolean ignorePom) throws Exception
+  {
+    URI uri = new URI(jarfileUri);
+    String scheme = uri.getScheme();
+    StramAppLauncher appLauncher = null;
+    if (scheme == null || scheme.equals("file")) {
+      File jf = new File(uri.getPath());
+      appLauncher = new StramAppLauncher(jf, config);
+    } else {
+      try (FileSystem tmpFs = FileSystem.newInstance(uri, conf)) {
+        Path path = new Path(uri.getPath());
+        appLauncher = new StramAppLauncher(tmpFs, path, config);
+      }
+    }
+    if (appLauncher != null) {
+      if (verboseLevel > 0) {
+        System.err.print(appLauncher.getMvnBuildClasspathOutput());
+      }
+      return appLauncher;
+    } else {
+      throw new CliException("Scheme " + scheme + " not supported.");
+    }
+  }
+
+  private static class CommandSpec
+  {
+    Command command;
+    Arg[] requiredArgs;
+    Arg[] optionalArgs;
+    String description;
+
+    CommandSpec(Command command, Arg[] requiredArgs, Arg[] optionalArgs, String description)
+    {
+      this.command = command;
+      this.requiredArgs = requiredArgs;
+      this.optionalArgs = optionalArgs;
+      this.description = description;
+    }
+
+    void verifyArguments(String[] args) throws CliException
+    {
+      int minArgs = 0;
+      int maxArgs = 0;
+      if (requiredArgs != null) {
+        minArgs = requiredArgs.length;
+        maxArgs = requiredArgs.length;
+      }
+      if (optionalArgs != null) {
+        for (Arg arg : optionalArgs) {
+          if (arg instanceof VarArg) {
+            maxArgs = Integer.MAX_VALUE;
+            break;
+          } else {
+            maxArgs++;
+          }
+        }
+      }
+      if (args.length - 1 < minArgs || args.length - 1 > maxArgs) {
+        throw new CliException("Command parameter error");
+      }
+    }
+
+    void printUsage(String cmd)
+    {
+      System.err.print("Usage: " + cmd);
+      if (requiredArgs != null) {
+        for (Arg arg : requiredArgs) {
+          System.err.print(" <" + arg + ">");
+        }
+      }
+      if (optionalArgs != null) {
+        for (Arg arg : optionalArgs) {
+          if (arg instanceof VarArg) {
+            System.err.print(" [<" + arg + "> ... ]");
+          } else {
+            System.err.print(" [<" + arg + ">]");
+          }
+        }
+      }
+      System.err.println();
+    }
+
+  }
+
+  private static class OptionsCommandSpec extends CommandSpec
+  {
+    Options options;
+
+    OptionsCommandSpec(Command command, Arg[] requiredArgs, Arg[] optionalArgs, String description, Options options)
+    {
+      super(command, requiredArgs, optionalArgs, description);
+      this.options = options;
+    }
+
+    @Override
+    void verifyArguments(String[] args) throws CliException
+    {
+      try {
+        args = new PosixParser().parse(options, args).getArgs();
+        super.verifyArguments(args);
+      } catch (Exception ex) {
+        throw new CliException("Command parameter error");
+      }
+    }
+
+    @Override
+    void printUsage(String cmd)
+    {
+      super.printUsage(cmd + ((options == null) ? "" : " [options]"));
+      if (options != null) {
+        System.out.println("Options:");
+        HelpFormatter formatter = new HelpFormatter();
+        PrintWriter pw = new PrintWriter(System.out);
+        formatter.printOptions(pw, 80, options, 4, 4);
+        pw.flush();
+      }
+    }
+
+  }
+
+  ApexCli()
+  {
+    //
+    // Global command specification starts here
+    //
+    globalCommands.put("help", new CommandSpec(new HelpCommand(),
+        null,
+        new Arg[]{new CommandArg("command")},
+        "Show help"));
+    globalCommands.put("echo", new CommandSpec(new EchoCommand(),
+        null, new Arg[]{new VarArg("arg")},
+        "Echo the arguments"));
+    globalCommands.put("connect", new CommandSpec(new ConnectCommand(),
+        new Arg[]{new Arg("app-id")},
+        null,
+        "Connect to an app"));
+    globalCommands.put("launch", new OptionsCommandSpec(new LaunchCommand(),
+        new Arg[]{},
+        new Arg[]{new FileArg("jar-file/json-file/properties-file/app-package-file"), new Arg("matching-app-name")},
+        "Launch an app", LAUNCH_OPTIONS.options));
+    globalCommands.put("shutdown-app", new CommandSpec(new ShutdownAppCommand(),
+        new Arg[]{new Arg("app-id")},
+        new Arg[]{new VarArg("app-id")},
+        "Shutdown an app"));
+    globalCommands.put("list-apps", new CommandSpec(new ListAppsCommand(),
+        null,
+        new Arg[]{new Arg("pattern")},
+        "List applications"));
+    globalCommands.put("kill-app", new CommandSpec(new KillAppCommand(),
+        new Arg[]{new Arg("app-id")},
+        new Arg[]{new VarArg("app-id")},
+        "Kill an app"));
+    globalCommands.put("show-logical-plan", new OptionsCommandSpec(new ShowLogicalPlanCommand(),
+        new Arg[]{new FileArg("jar-file/app-package-file")},
+        new Arg[]{new Arg("class-name")},
+        "List apps in a jar or show logical plan of an app class",
+        getShowLogicalPlanCommandLineOptions()));
+
+    globalCommands.put("get-jar-operator-classes", new OptionsCommandSpec(new GetJarOperatorClassesCommand(),
+        new Arg[]{new FileArg("jar-files-comma-separated")},
+        new Arg[]{new Arg("search-term")},
+        "List operators in a jar list",
+        GET_OPERATOR_CLASSES_OPTIONS.options));
+
+    globalCommands.put("get-jar-operator-properties", new CommandSpec(new GetJarOperatorPropertiesCommand(),
+        new Arg[]{new FileArg("jar-files-comma-separated"), new Arg("operator-class-name")},
+        null,
+        "List properties in specified operator"));
+
+    globalCommands.put("alias", new CommandSpec(new AliasCommand(),
+        new Arg[]{new Arg("alias-name"), new CommandArg("command")},
+        null,
+        "Create a command alias"));
+    globalCommands.put("source", new CommandSpec(new SourceCommand(),
+        new Arg[]{new FileArg("file")},
+        null,
+        "Execute the commands in a file"));
+    globalCommands.put("exit", new CommandSpec(new ExitCommand(),
+        null,
+        null,
+        "Exit the CLI"));
+    globalCommands.put("begin-macro", new CommandSpec(new BeginMacroCommand(),
+        new Arg[]{new Arg("name")},
+        null,
+        "Begin Macro Definition ($1...$9 to access parameters and type 'end' to end the definition)"));
+    globalCommands.put("dump-properties-file", new CommandSpec(new DumpPropertiesFileCommand(),
+        new Arg[]{new FileArg("out-file"), new FileArg("jar-file"), new Arg("app-name")},
+        null,
+        "Dump the properties file of an app class"));
+    globalCommands.put("get-app-info", new CommandSpec(new GetAppInfoCommand(),
+        new Arg[]{new Arg("app-id")},
+        null,
+        "Get the information of an app"));
+    globalCommands.put("set-pager", new CommandSpec(new SetPagerCommand(),
+        new Arg[]{new Arg("on/off")},
+        null,
+        "Set the pager program for output"));
+    globalCommands.put("get-config-parameter", new CommandSpec(new GetConfigParameterCommand(),
+        null,
+        new Arg[]{new FileArg("parameter-name")},
+        "Get the configuration parameter"));
+    globalCommands.put("get-app-package-info", new CommandSpec(new GetAppPackageInfoCommand(),
+        new Arg[]{new FileArg("app-package-file")},
+        null,
+        "Get info on the app package file"));
+    globalCommands.put("get-app-package-operators", new OptionsCommandSpec(new GetAppPackageOperatorsCommand(),
+        new Arg[]{new FileArg("app-package-file")},
+        new Arg[]{new Arg("search-term")},
+        "Get operators within the given app package",
+        GET_OPERATOR_CLASSES_OPTIONS.options));
+    globalCommands.put("get-app-package-operator-properties", new CommandSpec(new GetAppPackageOperatorPropertiesCommand(),
+        new Arg[]{new FileArg("app-package-file"), new Arg("operator-class")},
+        null,
+        "Get operator properties within the given app package"));
+    globalCommands.put("list-default-app-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.APPLICATION),
+        null, null, "Lists the default application attributes"));
+    globalCommands.put("list-default-operator-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.OPERATOR),
+        null, null, "Lists the default operator attributes"));
+    globalCommands.put("list-default-port-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.PORT),
+        null, null, "Lists the default port attributes"));
+    globalCommands.put("clean-app-directories", new CommandSpec(new CleanAppDirectoriesCommand(),
+        new Arg[]{new Arg("duration-in-millis")},
+        null,
+        "Clean up data directories of applications that terminated the given milliseconds ago"));
+
+    //
+    // Connected command specification starts here
+    //
+    connectedCommands.put("list-containers", new CommandSpec(new ListContainersCommand(),
+        null,
+        null,
+        "List containers"));
+    connectedCommands.put("list-operators", new CommandSpec(new ListOperatorsCommand(),
+        null,
+        new Arg[]{new Arg("pattern")},
+        "List operators"));
+    connectedCommands.put("show-physical-plan", new CommandSpec(new ShowPhysicalPlanCommand(),
+        null,
+        null,
+        "Show physical plan"));
+    connectedCommands.put("kill-container", new CommandSpec(new KillContainerCommand(),
+        new Arg[]{new Arg("container-id")},
+        new Arg[]{new VarArg("container-id")},
+        "Kill a container"));
+    connectedCommands.put("shutdown-app", new CommandSpec(new ShutdownAppCommand(),
+        null,
+        new Arg[]{new VarArg("app-id")},
+        "Shutdown an app"));
+    connectedCommands.put("kill-app", new CommandSpec(new KillAppCommand(),
+        null,
+        new Arg[]{new VarArg("app-id")},
+        "Kill an app"));
+    connectedCommands.put("wait", new CommandSpec(new WaitCommand(),
+        new Arg[]{new Arg("timeout")},
+        null,
+        "Wait for completion of current application"));
+    connectedCommands.put("start-recording", new CommandSpec(new StartRecordingCommand(),
+        new Arg[]{new Arg("operator-id")},
+        new Arg[]{new Arg("port-name"), new Arg("num-windows")},
+        "Start recording"));
+    connectedCommands.put("stop-recording", new CommandSpec(new StopRecordingCommand(),
+        new Arg[]{new Arg("operator-id")},
+        new Arg[]{new Arg("port-name")},
+        "Stop recording"));
+    connectedCommands.put("get-operator-attributes", new CommandSpec(new GetOperatorAttributesCommand(),
+        new Arg[]{new Arg("operator-name")},
+        new Arg[]{new Arg("attribute-name")},
+        "Get attributes of an operator"));
+    connectedCommands.put("get-operator-properties", new CommandSpec(new GetOperatorPropertiesCommand(),
+        new Arg[]{new Arg("operator-name")},
+        new Arg[]{new Arg("property-name")},
+        "Get properties of a logical operator"));
+    connectedCommands.put("get-physical-operator-properties", new OptionsCommandSpec(new GetPhysicalOperatorPropertiesCommand(),
+        new Arg[]{new Arg("operator-id")},
+        null,
+        "Get properties of a physical operator", GET_PHYSICAL_PROPERTY_OPTIONS.options));
+
+    connectedCommands.put("set-operator-property", new CommandSpec(new SetOperatorPropertyCommand(),
+        new Arg[]{new Arg("operator-name"), new Arg("property-name"), new Arg("property-value")},
+        null,
+        "Set a property of an operator"));
+    connectedCommands.put("set-physical-operator-property", new CommandSpec(new SetPhysicalOperatorPropertyCommand(),
+        new Arg[]{new Arg("operator-id"), new Arg("property-name"), new Arg("property-value")},
+        null,
+        "Set a property of an operator"));
+    connectedCommands.put("get-app-attributes", new CommandSpec(new GetAppAttributesCommand(),
+        null,
+        new Arg[]{new Arg("attribute-name")},
+        "Get attributes of the connected app"));
+    connectedCommands.put("get-port-attributes", new CommandSpec(new GetPortAttributesCommand(),
+        new Arg[]{new Arg("operator-name"), new Arg("port-name")},
+        new Arg[]{new Arg("attribute-name")},
+        "Get attributes of a port"));
+    connectedCommands.put("begin-logical-plan-change", new CommandSpec(new BeginLogicalPlanChangeCommand(),
+        null,
+        null,
+        "Begin Logical Plan Change"));
+    connectedCommands.put("show-logical-plan", new OptionsCommandSpec(new ShowLogicalPlanCommand(),
+        null,
+        new Arg[]{new FileArg("jar-file/app-package-file"), new Arg("class-name")},
+        "Show logical plan of an app class",
+        getShowLogicalPlanCommandLineOptions()));
+    connectedCommands.put("dump-properties-file", new CommandSpec(new DumpPropertiesFileCommand(),
+        new Arg[]{new FileArg("out-file")},
+        new Arg[]{new FileArg("jar-file"), new Arg("class-name")},
+        "Dump the properties file of an app class"));
+    connectedCommands.put("get-app-info", new CommandSpec(new GetAppInfoCommand(),
+        null,
+        new Arg[]{new Arg("app-id")},
+        "Get the information of an app"));
+    connectedCommands.put("get-recording-info", new CommandSpec(new GetRecordingInfoCommand(),
+        null,
+        new Arg[]{new Arg("operator-id"), new Arg("start-time")},
+        "Get tuple recording info"));
+
+    //
+    // Logical plan change command specification starts here
+    //
+    logicalPlanChangeCommands.put("help", new CommandSpec(new HelpCommand(),
+        null,
+        new Arg[]{new Arg("command")},
+        "Show help"));
+    logicalPlanChangeCommands.put("create-operator", new CommandSpec(new CreateOperatorCommand(),
+        new Arg[]{new Arg("operator-name"), new Arg("class-name")},
+        null,
+        "Create an operator"));
+    logicalPlanChangeCommands.put("create-stream", new CommandSpec(new CreateStreamCommand(),
+        new Arg[]{new Arg("stream-name"), new Arg("from-operator-name"), new Arg("from-port-name"), new Arg("to-operator-name"), new Arg("to-port-name")},
+        null,
+        "Create a stream"));
+    logicalPlanChangeCommands.put("add-stream-sink", new CommandSpec(new AddStreamSinkCommand(),
+        new Arg[]{new Arg("stream-name"), new Arg("to-operator-name"), new Arg("to-port-name")},
+        null,
+        "Add a sink to an existing stream"));
+    logicalPlanChangeCommands.put("remove-operator", new CommandSpec(new RemoveOperatorCommand(),
+        new Arg[]{new Arg("operator-name")},
+        null,
+        "Remove an operator"));
+    logicalPlanChangeCommands.put("remove-stream", new CommandSpec(new RemoveStreamCommand(),
+        new Arg[]{new Arg("stream-name")},
+        null,
+        "Remove a stream"));
+    logicalPlanChangeCommands.put("set-operator-property", new CommandSpec(new SetOperatorPropertyCommand(),
+        new Arg[]{new Arg("operator-name"), new Arg("property-name"), new Arg("property-value")},
+        null,
+        "Set a property of an operator"));
+    logicalPlanChangeCommands.put("set-operator-attribute", new CommandSpec(new SetOperatorAttributeCommand(),
+        new Arg[]{new Arg("operator-name"), new Arg("attr-name"), new Arg("attr-value")},
+        null,
+        "Set an attribute of an operator"));
+    logicalPlanChangeCommands.put("set-port-attribute", new CommandSpec(new SetPortAttributeCommand(),
+        new Arg[]{new Arg("operator-name"), new Arg("port-name"), new Arg("attr-name"), new Arg("attr-value")},
+        null,
+        "Set an attribute of a port"));
+    logicalPlanChangeCommands.put("set-stream-attribute", new CommandSpec(new SetStreamAttributeCommand(),
+        new Arg[]{new Arg("stream-name"), new Arg("attr-name"), new Arg("attr-value")},
+        null,
+        "Set an attribute of a stream"));
+    logicalPlanChangeCommands.put("show-queue", new CommandSpec(new ShowQueueCommand(),
+        null,
+        null,
+        "Show the queue of the plan change"));
+    logicalPlanChangeCommands.put("submit", new CommandSpec(new SubmitCommand(),
+        null,
+        null,
+        "Submit the plan change"));
+    logicalPlanChangeCommands.put("abort", new CommandSpec(new AbortCommand(),
+        null,
+        null,
+        "Abort the plan change"));
+  }
+
+  private void printJson(String json) throws IOException
+  {
+    PrintStream os = getOutputPrintStream();
+
+    if (jsonp != null) {
+      os.println(jsonp + "(" + json + ");");
+    } else {
+      os.println(json);
+    }
+    os.flush();
+    closeOutputPrintStream(os);
+  }
+
+  private void printJson(JSONObject json) throws JSONException, IOException
+  {
+    printJson(raw ? json.toString() : json.toString(2));
+  }
+
+  private void printJson(JSONArray jsonArray, String name) throws JSONException, IOException
+  {
+    JSONObject json = new JSONObject();
+    json.put(name, jsonArray);
+    printJson(json);
+  }
+
+  private <K, V> void printJson(Map<K, V> map) throws IOException, JSONException
+  {
+    printJson(new JSONObject(mapper.writeValueAsString(map)));
+  }
+
+  private <T> void printJson(List<T> list, String name) throws IOException, JSONException
+  {
+    printJson(new JSONArray(mapper.writeValueAsString(list)), name);
+  }
+
+  private PrintStream getOutputPrintStream() throws IOException
+  {
+    if (pagerCommand == null) {
+      pagerProcess = null;
+      return System.out;
+    } else {
+      pagerProcess = Runtime.getRuntime().exec(new String[]{"sh", "-c",
+        pagerCommand + " >/dev/tty"});
+      return new PrintStream(pagerProcess.getOutputStream());
+    }
+  }
+
+  private void closeOutputPrintStream(PrintStream os)
+  {
+    if (os != System.out) {
+      os.close();
+      try {
+        pagerProcess.waitFor();
+      } catch (InterruptedException ex) {
+        LOG.debug("Interrupted");
+      }
+    }
+  }
+
+  private static String expandFileName(String fileName, boolean expandWildCard) throws IOException
+  {
+    if (fileName.matches("^[a-zA-Z]+:.*")) {
+      // it's a URL
+      return fileName;
+    }
+
+    // TODO: need to work with other users' home directory
+    if (fileName.startsWith("~" + File.separator)) {
+      fileName = System.getProperty("user.home") + fileName.substring(1);
+    }
+    fileName = new File(fileName).getCanonicalPath();
+    //LOG.debug("Canonical path: {}", fileName);
+    if (expandWildCard) {
+      DirectoryScanner scanner = new DirectoryScanner();
+      scanner.setIncludes(new String[]{fileName});
+      scanner.scan();
+      String[] files = scanner.getIncludedFiles();
+
+      if (files.length == 0) {
+        throw new CliException(fileName + " does not match any file");
+      } else if (files.length > 1) {
+        throw new CliException(fileName + " matches more than one file");
+      }
+      return files[0];
+    } else {
+      return fileName;
+    }
+  }
+
+  private static String[] expandFileNames(String fileName) throws IOException
+  {
+    // TODO: need to work with other users
+    if (fileName.matches("^[a-zA-Z]+:.*")) {
+      // it's a URL
+      return new String[]{fileName};
+    }
+    if (fileName.startsWith("~" + File.separator)) {
+      fileName = System.getProperty("user.home") + fileName.substring(1);
+    }
+    fileName = new File(fileName).getCanonicalPath();
+    LOG.debug("Canonical path: {}", fileName);
+    DirectoryScanner scanner = new DirectoryScanner();
+    scanner.setIncludes(new String[]{fileName});
+    scanner.scan();
+    return scanner.getIncludedFiles();
+  }
+
+  private static String expandCommaSeparatedFiles(String filenames) throws IOException
+  {
+    String[] entries = filenames.split(",");
+    StringBuilder result = new StringBuilder(filenames.length());
+    for (String entry : entries) {
+      for (String file : expandFileNames(entry)) {
+        if (result.length() > 0) {
+          result.append(",");
+        }
+        result.append(file);
+      }
+    }
+    if (result.length() == 0) {
+      return null;
+    }
+    return result.toString();
+  }
+
+  protected ApplicationReport getApplication(String appId)
+  {
+    List<ApplicationReport> appList = getApplicationList();
+    if (StringUtils.isNumeric(appId)) {
+      int appSeq = Integer.parseInt(appId);
+      for (ApplicationReport ar : appList) {
+        if (ar.getApplicationId().getId() == appSeq) {
+          return ar;
+        }
+      }
+    } else {
+      for (ApplicationReport ar : appList) {
+        if (ar.getApplicationId().toString().equals(appId)) {
+          return ar;
+        }
+      }
+    }
+    return null;
+  }
+
+  private static class CliException extends RuntimeException
+  {
+    private static final long serialVersionUID = 1L;
+
+    CliException(String msg, Throwable cause)
+    {
+      super(msg, cause);
+    }
+
+    CliException(String msg)
+    {
+      super(msg);
+    }
+
+  }
+
+  public void preImpersonationInit(String[] args) throws IOException
+  {
+    Signal.handle(new Signal("INT"), new SignalHandler()
+    {
+      @Override
+      public void handle(Signal sig)
+      {
+        System.out.println("^C");
+        if (commandThread != null) {
+          commandThread.interrupt();
+          mainThread.interrupt();
+        } else {
+          System.out.print(prompt);
+          System.out.flush();
+        }
+      }
+    });
+    consolePresent = (System.console() != null);
+    Options options = new Options();
+    options.addOption("e", true, "Commands are read from the argument");
+    options.addOption("v", false, "Verbose mode level 1");
+    options.addOption("vv", false, "Verbose mode level 2");
+    options.addOption("vvv", false, "Verbose mode level 3");
+    options.addOption("vvvv", false, "Verbose mode level 4");
+    options.addOption("r", false, "JSON Raw mode");
+    options.addOption("p", true, "JSONP padding function");
+    options.addOption("h", false, "Print this help");
+    options.addOption("f", true, "Use the specified prompt at all time");
+    options.addOption("kp", true, "Use the specified kerberos principal");
+    options.addOption("kt", true, "Use the specified kerberos keytab");
+
+    CommandLineParser parser = new BasicParser();
+    try {
+      CommandLine cmd = parser.parse(options, args);
+      if (cmd.hasOption("v")) {
+        verboseLevel = 1;
+      }
+      if (cmd.hasOption("vv")) {
+        verboseLevel = 2;
+      }
+      if (cmd.hasOption("vvv")) {
+        verboseLevel = 3;
+      }
+      if (cmd.hasOption("vvvv")) {
+        verboseLevel = 4;
+      }
+      if (cmd.hasOption("r")) {
+        raw = true;
+      }
+      if (cmd.hasOption("e")) {
+        commandsToExecute = cmd.getOptionValues("e");
+        consolePresent = false;
+      }
+      if (cmd.hasOption("p")) {
+        jsonp = cmd.getOptionValue("p");
+      }
+      if (cmd.hasOption("f")) {
+        forcePrompt = cmd.getOptionValue("f");
+      }
+      if (cmd.hasOption("h")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(ApexCli.class.getSimpleName(), options);
+        System.exit(0);
+      }
+      if (cmd.hasOption("kp")) {
+        kerberosPrincipal = cmd.getOptionValue("kp");
+      }
+      if (cmd.hasOption("kt")) {
+        kerberosKeyTab = cmd.getOptionValue("kt");
+      }
+    } catch (ParseException ex) {
+      System.err.println("Invalid argument: " + ex);
+      System.exit(1);
+    }
+
+    if (kerberosPrincipal == null && kerberosKeyTab != null) {
+      System.err.println("Kerberos key tab is specified but not the kerberos principal. Please specify it using the -kp option.");
+      System.exit(1);
+    }
+    if (kerberosPrincipal != null && kerberosKeyTab == null) {
+      System.err.println("Kerberos principal is specified but not the kerberos key tab. Please specify it using the -kt option.");
+      System.exit(1);
+    }
+
+    Level logLevel;
+    switch (verboseLevel) {
+      case 0:
+        logLevel = Level.OFF;
+        break;
+      case 1:
+        logLevel = Level.ERROR;
+        break;
+      case 2:
+        logLevel = Level.WARN;
+        break;
+      case 3:
+        logLevel = Level.INFO;
+        break;
+      default:
+        logLevel = Level.DEBUG;
+        break;
+    }
+
+    for (org.apache.log4j.Logger logger : new org.apache.log4j.Logger[]{
+        org.apache.log4j.Logger.getRootLogger(),
+        org.apache.log4j.Logger.getLogger(ApexCli.class)
+    }) {
+      @SuppressWarnings("unchecked")
+      Enumeration<Appender> allAppenders = logger.getAllAppenders();
+      while (allAppenders.hasMoreElements()) {
+        Appender appender = allAppenders.nextElement();
+        if (appender instanceof ConsoleAppender) {
+          ((ConsoleAppender)appender).setThreshold(logLevel);
+        }
+      }
+    }
+
+    if (commandsToExecute != null) {
+      for (String command : commandsToExecute) {
+        LOG.debug("Command to be executed: {}", command);
+      }
+    }
+    if (kerberosPrincipal != null && kerberosKeyTab != null) {
+      StramUserLogin.authenticate(kerberosPrincipal, kerberosKeyTab);
+    } else {
+      Configuration config = new YarnConfiguration();
+      StramClientUtils.addDTLocalResources(config);
+      StramUserLogin.attemptAuthentication(config);
+    }
+  }
+
+  public void init() throws IOException
+  {
+    conf = StramClientUtils.addDTSiteResources(new YarnConfiguration());
+    fs = StramClientUtils.newFileSystemInstance(conf);
+    stramAgent = new StramAgent(fs, conf);
+
+    yarnClient.init(conf);
+    yarnClient.start();
+    LOG.debug("Yarn Client initialized and started");
+    String socks = conf.get(CommonConfigurationKeysPublic.HADOOP_SOCKS_SERVER_KEY);
+    if (socks != null) {
+      int colon = socks.indexOf(':');
+      if (colon > 0) {
+        LOG.info("Using socks proxy at {}", socks);
+        System.setProperty("socksProxyHost", socks.substring(0, colon));
+        System.setProperty("socksProxyPort", socks.substring(colon + 1));
+      }
+    }
+  }
+
+  private void processSourceFile(String fileName, ConsoleReader reader) throws FileNotFoundException, IOException
+  {
+    fileName = expandFileName(fileName, true);
+    LOG.debug("Sourcing {}", fileName);
+    boolean consolePresentSaved = consolePresent;
+    consolePresent = false;
+    FileLineReader fr = null;
+    String line;
+    try {
+      fr = new FileLineReader(fileName);
+      while ((line = fr.readLine("")) != null) {
+        processLine(line, fr, true);
+      }
+    } finally {
+      consolePresent = consolePresentSaved;
+      if (fr != null) {
+        fr.close();
+      }
+    }
+  }
+
+  private static final class MyNullCompleter implements Completer
+  {
+    public static final MyNullCompleter INSTANCE = new MyNullCompleter();
+
+    @Override
+    public int complete(final String buffer, final int cursor, final List<CharSequence> candidates)
+    {
+      candidates.add("");
+      return cursor;
+    }
+
+  }
+
+  private static final class MyFileNameCompleter extends FileNameCompleter
+  {
+    @Override
+    public int complete(final String buffer, final int cursor, final List<CharSequence> candidates)
+    {
+      int result = super.complete(buffer, cursor, candidates);
+      if (candidates.isEmpty()) {
+        candidates.add("");
+        result = cursor;
+      }
+      return result;
+    }
+
+  }
+
+  private List<Completer> defaultCompleters()
+  {
+    Map<String, CommandSpec> commands = new TreeMap<>();
+
+    commands.putAll(logicalPlanChangeCommands);
+    commands.putAll(connectedCommands);
+    commands.putAll(globalCommands);
+
+    List<Completer> completers = new LinkedList<>();
+    for (Map.Entry<String, CommandSpec> entry : commands.entrySet()) {
+      String command = entry.getKey();
+      CommandSpec cs = entry.getValue();
+      List<Completer> argCompleters = new LinkedList<>();
+      argCompleters.add(new StringsCompleter(command));
+      Arg[] args = (Arg[])ArrayUtils.addAll(cs.requiredArgs, cs.optionalArgs);
+      if (args != null) {
+        if (cs instanceof OptionsCommandSpec) {
+          // ugly hack because jline cannot dynamically change completer while user types
+          if (args[0] instanceof FileArg || args[0] instanceof VarArg) {
+            for (int i = 0; i < 10; i++) {
+              argCompleters.add(new MyFileNameCompleter());
+            }
+          }
+        } else {
+          for (Arg arg : args) {
+            if (arg instanceof FileArg || arg instanceof VarArg) {
+              argCompleters.add(new MyFileNameCompleter());
+            } else if (arg instanceof CommandArg) {
+              argCompleters.add(new StringsCompleter(commands.keySet().toArray(new String[]{})));
+            } else {
+              argCompleters.add(MyNullCompleter.INSTANCE);
+            }
+          }
+        }
+      }
+
+      completers.add(new ArgumentCompleter(argCompleters));
+    }
+
+    List<Completer> argCompleters = new LinkedList<>();
+    Set<String> set = new TreeSet<>();
+    set.addAll(aliases.keySet());
+    set.addAll(macros.keySet());
+    argCompleters.add(new StringsCompleter(set.toArray(new String[]{})));
+    for (int i = 0; i < 10; i++) {
+      argCompleters.add(new MyFileNameCompleter());
+    }
+    completers.add(new ArgumentCompleter(argCompleters));
+    return completers;
+  }
+
+  private void setupCompleter(ConsoleReader reader)
+  {
+    reader.addCompleter(new AggregateCompleter(defaultCompleters()));
+  }
+
+  private void updateCompleter(ConsoleReader reader)
+  {
+    List<Completer> completers = new ArrayList<>(reader.getCompleters());
+    for (Completer c : completers) {
+      reader.removeCompleter(c);
+    }
+    setupCompleter(reader);
+  }
+
+  private void setupHistory(ConsoleReader reader)
+  {
+    File historyFile = new File(StramClientUtils.getUserDTDirectory(), "cli_history");
+    historyFile.getParentFile().mkdirs();
+    try {
+      topLevelHistory = new FileHistory(historyFile);
+      reader.setHistory(topLevelHistory);
+      historyFile = new File(StramClientUtils.getUserDTDirectory(), "cli_history_clp");
+      changingLogicalPlanHistory = new FileHistory(historyFile);
+    } catch (IOException ex) {
+      System.err.printf("Unable to open %s for writing.", historyFile);
+    }
+  }
+
+  private void setupAgents() throws IOException
+  {
+    recordingsAgent = new RecordingsAgent(stramAgent);
+  }
+
+  public void run() throws IOException
+  {
+    ConsoleReader reader = new ConsoleReader();
+    reader.setExpandEvents(false);
+    reader.setBellEnabled(false);
+    try {
+      processSourceFile(StramClientUtils.getConfigDir() + "/clirc_system", reader);
+    } catch (Exception ex) {
+      // ignore
+    }
+    try {
+      processSourceFile(StramClientUtils.getUserDTDirectory() + "/clirc", reader);
+    } catch (Exception ex) {
+      // ignore
+    }
+    if (consolePresent) {
+      printWelcomeMessage();
+      setupCompleter(reader);
+      setupHistory(reader);
+      //reader.setHandleUserInterrupt(true);
+    } else {
+      reader.setEchoCharacter((char)0);
+    }
+    setupAgents();
+    String line;
+    PrintWriter out = new PrintWriter(System.out);
+    int i = 0;
+    while (true) {
+      if (commandsToExecute != null) {
+        if (i >= commandsToExecute.length) {
+          break;
+        }
+        line = commandsToExecute[i++];
+      } else {
+        line = readLine(reader);
+        if (line == null) {
+          break;
+        }
+      }
+      processLine(line, reader, true);
+      out.flush();
+    }
+    if (topLevelHistory != null) {
+      try {
+        topLevelHistory.flush();
+      } catch (IOException ex) {
+        LOG.warn("Cannot flush command history", ex);
+      }
+    }
+    if (changingLogicalPlanHistory != null) {
+      try {
+        changingLogicalPlanHistory.flush();
+      } catch (IOException ex) {
+        LOG.warn("Cannot flush command history", ex);
+      }
+    }
+    if (consolePresent) {
+      System.out.println("exit");
+    }
+  }
+
+  private List<String> expandMacro(List<String> lines, String[] args)
+  {
+    List<String> expandedLines = new ArrayList<>();
+
+    for (String line : lines) {
+      int previousIndex = 0;
+      StringBuilder expandedLine = new StringBuilder(line.length());
+      while (true) {
+        // Search for $0..$9 within the each line and replace by corresponding args
+        int currentIndex = line.indexOf('$', previousIndex);
+        if (currentIndex > 0 && line.length() > currentIndex + 1) {
+          int argIndex = line.charAt(currentIndex + 1) - '0';
+          if (args.length > argIndex && argIndex >= 0) {
+            // Replace $0 with macro name or $1..$9 with input arguments
+            expandedLine.append(line.substring(previousIndex, currentIndex)).append(args[argIndex]);
+          } else if (argIndex >= 0 && argIndex <= 9) {
+            // Arguments for $1..$9 were not supplied - replace with empty strings
+            expandedLine.append(line.substring(previousIndex, currentIndex));
+          } else {
+            // Outside valid arguments range - ignore and do not replace
+            expandedLine.append(line.substring(previousIndex, currentIndex + 2));
+          }
+          currentIndex += 2;
+        } else {
+          expandedLine.append(line.substring(previousIndex));
+          expandedLines.add(expandedLine.toString());
+          break;
+        }
+        previousIndex = currentIndex;
+      }
+    }
+    return expandedLines;
+  }
+
+  private static String ltrim(String s)
+  {
+    int i = 0;
+    while (i < s.length() && Character.isWhitespace(s.charAt(i))) {
+      i++;
+    }
+    return s.substring(i);
+  }
+
+  private void processLine(String line, final ConsoleReader reader, boolean expandMacroAlias)
+  {
+    try {
+      // clear interrupt flag
+      Thread.interrupted();
+      if (reader.isHistoryEnabled()) {
+        History history = reader.getHistory();
+        if (history instanceof FileHistory) {
+          try {
+            ((FileHistory)history).flush();
+          } catch (IOException ex) {
+            // ignore
+          }
+        }
+      }
+      //LOG.debug("line: \"{}\"", line);
+      List<String[]> commands = tokenizer.tokenize(line);
+      if (commands == null) {
+        return;
+      }
+      for (final String[] args : commands) {
+        if (args.length == 0 || StringUtils.isBlank(args[0])) {
+          continue;
+        }
+        //LOG.debug("Got: {}", mapper.writeValueAsString(args));
+        if (expandMacroAlias) {
+          if (macros.containsKey(args[0])) {
+            List<String> macroItems = expandMacro(macros.get(args[0]), args);
+            for (String macroItem : macroItems) {
+              if (consolePresent) {
+                System.out.println("expanded-macro> " + macroItem);
+              }
+              processLine(macroItem, reader, false);
+            }
+            continue;
+          }
+
+          if (aliases.containsKey(args[0])) {
+            processLine(aliases.get(args[0]), reader, false);
+            continue;
+          }
+        }
+        CommandSpec cs = null;
+        if (changingLogicalPlan) {
+          cs = logicalPlanChangeCommands.get(args[0]);
+        } else {
+          if (currentApp != null) {
+            cs = connectedCommands.get(args[0]);
+          }
+          if (cs == null) {
+            cs = globalCommands.get(args[0]);
+          }
+        }
+        if (cs == null) {
+          if (connectedCommands.get(args[0]) != null) {
+            System.err.println("\"" + args[0] + "\" is valid only when connected to an application. Type \"connect <appid>\" to connect to an application.");
+            lastCommandError = true;
+          } else if (logicalPlanChangeCommands.get(args[0]) != null) {
+            System.err.println("\"" + args[0] + "\" is valid only when changing a logical plan.  Type \"begin-logical-plan-change\" to change a logical plan");
+            lastCommandError = true;
+          } else {
+            System.err.println("Invalid command '" + args[0] + "'. Type \"help\" for list of commands");
+            lastCommandError = true;
+          }
+        } else {
+          try {
+            cs.verifyArguments(args);
+          } catch (CliException ex) {
+            cs.printUsage(args[0]);
+            throw ex;
+          }
+          final Command command = cs.command;
+          commandThread = new Thread()
+          {
+            @Override
+            public void run()
+            {
+              try {
+                command.execute(args, reader);
+                lastCommandError = false;
+              } catch (Exception e) {
+                handleException(e);
+              } catch (Error e) {
+                handleException(e);
+                System.err.println("Fatal error encountered");
+                System.exit(1);
+              }
+            }
+
+          };
+          mainThread = Thread.currentThread();
+          commandThread.start();
+          try {
+            commandThread.join();
+          } catch (InterruptedException ex) {
+            System.err.println("Interrupted");
+          }
+          commandThread = null;
+        }
+      }
+    } catch (Exception e) {
+      handleException(e);
+    }
+  }
+
+  private void handleException(Throwable e)
+  {
+    System.err.println(ExceptionUtils.getFullStackTrace(e));
+    LOG.error("Exception caught: ", e);
+    lastCommandError = true;
+  }
+
+  private void printWelcomeMessage()
+  {
+    VersionInfo v = VersionInfo.APEX_VERSION;
+    System.out.println("Apex CLI " + v.getVersion() + " " + v.getDate() + " " + v.getRevision());
+  }
+
+  private void printHelp(String command, CommandSpec commandSpec, PrintStream os)
+  {
+    if (consolePresent) {
+      os.print("\033[0;93m");
+      os.print(command);
+      os.print("\033[0m");
+    } else {
+      os.print(command);
+    }
+    if (commandSpec instanceof OptionsCommandSpec) {
+      OptionsCommandSpec ocs = (OptionsCommandSpec)commandSpec;
+      if (ocs.options != null) {
+        os.print(" [options]");
+      }
+    }
+    if (commandSpec.requiredArgs != null) {
+      for (Arg arg : commandSpec.requiredArgs) {
+        if (consolePresent) {
+          os.print(" \033[3m" + arg + "\033[0m");
+        } else {
+          os.print(" <" + arg + ">");
+        }
+      }
+    }
+    if (commandSpec.optionalArgs != null) {
+      for (Arg arg : commandSpec.optionalArgs) {
+        if (consolePresent) {
+          os.print(" [\033[3m" + arg + "\033[0m");
+        } else {
+          os.print(" [<" + arg + ">");
+        }
+        if (arg instanceof VarArg) {
+          os.print(" ...");
+        }
+        os.print("]");
+      }
+    }
+    os.println("\n\t" + commandSpec.description);
+    if (commandSpec instanceof OptionsCommandSpec) {
+      OptionsCommandSpec ocs = (OptionsCommandSpec)commandSpec;
+      if (ocs.options != null) {
+        os.println("\tOptions:");
+        HelpFormatter formatter = new HelpFormatter();
+        PrintWriter pw = new PrintWriter(os);
+        formatter.printOptions(pw, 80, ocs.options, 12, 4);
+        pw.flush();
+      }
+    }
+  }
+
+  private void printHelp(Map<String, CommandSpec> commandSpecs, PrintStream os)
+  {
+    for (Map.Entry<String, CommandSpec> entry : commandSpecs.entrySet()) {
+      printHelp(entry.getKey(), entry.getValue(), os);
+    }
+  }
+
+  private String readLine(ConsoleReader reader)
+    throws IOException
+  {
+    if (forcePrompt == null) {
+      prompt = "";
+      if (consolePresent) {
+        if (changingLogicalPlan) {
+          prompt = "logical-plan-change";
+        } else {
+          prompt = "apex";
+        }
+        if (currentApp != null) {
+          prompt += " (";
+          prompt += currentApp.getApplicationId().toString();
+          prompt += ") ";
+        }
+        prompt += "> ";
+      }
+    } else {
+      prompt = forcePrompt;
+    }
+    String line = reader.readLine(prompt, consolePresent ? null : (char)0);
+    if (line == null) {
+      return null;
+    }
+    return ltrim(line);
+  }
+
+  private List<ApplicationReport> getApplicationList()
+  {
+    try {
+      return yarnClient.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE));
+    } catch (Exception e) {
+      throw new CliException("Error getting application list from resource manager", e);
+    }
+  }
+
+  private String getContainerLongId(String containerId)
+  {
+    JSONObject json = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS, currentApp);
+    int shortId = 0;
+    if (StringUtils.isNumeric(containerId)) {
+      shortId = Integer.parseInt(containerId);
+    }
+    try {
+      Object containersObj = json.get("containers");
+      JSONArray containers;
+      if (containersObj instanceof JSONArray) {
+        containers = (JSONArray)containersObj;
+      } else {
+        containers = new JSONArray();
+        containers.put(containersObj);
+      }
+      if (containersObj != null) {
+        for (int o = containers.length(); o-- > 0; ) {
+          JSONObject container = containers.getJSONObject(o);
+          String id = container.getString("id");
+          if (id.equals(containerId) || (shortId != 0 && (id.endsWith("_" + shortId) || id.endsWith("0" + shortId)))) {
+            return id;
+          }
+        }
+      }
+    } catch (JSONException ex) {
+      // ignore
+    }
+    return null;
+  }
+
+  private ApplicationReport assertRunningApp(ApplicationReport app)
+  {
+    ApplicationReport r;
+    try {
+      r = yarnClient.getApplicationReport(app.getApplicationId());
+      if (r.getYarnApplicationState() != YarnApplicationState.RUNNING) {
+        String msg = String.format("Application %s not running (status %s)",
+            r.getApplicationId().getId(), r.getYarnApplicationState());
+        throw new CliException(msg);
+      }
+    } catch (YarnException rmExc) {
+      throw new CliException("Unable to determine application status", rmExc);
+    } catch (IOException rmExc) {
+      throw new CliException("Unable to determine application status", rmExc);
+    }
+    return r;
+  }
+
+  private JSONObject getResource(String resourcePath, ApplicationReport appReport)
+  {
+    return getResource(new StramAgent.StramUriSpec().path(resourcePath), appReport, new WebServicesClient.GetWebServicesHandler<JSONObject>());
+  }
+
+  private JSONObject getResource(StramAgent.StramUriSpec uriSpec, ApplicationReport appReport)
+  {
+    return getResource(uriSpec, appReport, new WebServicesClient.GetWebServicesHandler<JSONObject>());
+  }
+
+  private JSONObject getResource(StramAgent.StramUriSpec uriSpec, ApplicationReport appReport, WebServicesClient.WebServicesHandler handler)
+  {
+
+    if (appReport == null) {
+      throw new CliException("No application selected");
+    }
+
+    if (StringUtils.isEmpty(appReport.getTrackingUrl()) || appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
+      appReport = null;
+      throw new CliException("Application terminated");
+    }
+
+    WebServicesClient wsClient = new WebServicesClient();
+    try {
+      return stramAgent.issueStramWebRequest(wsClient, appReport.getApplicationId().toString(), uriSpec, handler);
+    } catch (Exception e) {
+      // check the application status as above may have failed due application termination etc.
+      if (appReport == currentApp) {
+        currentApp = assertRunningApp(appReport);
+      }
+      throw new CliException("Failed to request web service for appid " + appReport.getApplicationId().toString(), e);
+    }
+  }
+
+  private List<AppFactory> getMatchingAppFactories(StramAppLauncher submitApp, String matchString, boolean exactMatch)
+  {
+    try {
+      List<AppFactory> cfgList = submitApp.getBundledTopologies();
+
+      if (cfgList.isEmpty()) {
+        return null;
+      } else if (matchString == null) {
+        return cfgList;
+      } else {
+        List<AppFactory> result = new ArrayList<>();
+        if (!exactMatch) {
+          matchString = matchString.toLowerCase();
+        }
+        for (AppFactory ac : cfgList) {
+          String appName = ac.getName();
+          String appAlias = submitApp.getLogicalPlanConfiguration().getAppAlias(appName);
+          if (exactMatch) {
+            if (matchString.equals(appName) || matchString.equals(appAlias)) {
+              result.add(ac);
+            }
+          } else if (appName.toLowerCase().contains(matchString) || (appAlias != null && appAlias.toLowerCase()
+              .contains(matchString))) {
+            result.add(ac);
+          }
+        }
+        return result;
+      }
+    } catch (Exception ex) {
+      LOG.warn("Caught Exception: ", ex);
+      return null;
+    }
+  }
+
+  /*
+   * Below is the implementation of all commands
+   */
+  private class HelpCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      PrintStream os = getOutputPrintStream();
+      if (args.length < 2) {
+        os.println("GLOBAL COMMANDS EXCEPT WHEN CHANGING LOGICAL PLAN:\n");
+        printHelp(globalCommands, os);
+        os.println();
+        os.println("COMMANDS WHEN CONNECTED TO AN APP (via connect <appid>) EXCEPT WHEN CHANGING LOGICAL PLAN:\n");
+        printHelp(connectedCommands, os);
+        os.println();
+        os.println("COMMANDS WHEN CHANGING LOGICAL PLAN (via begin-logical-plan-change):\n");
+        printHelp(logicalPlanChangeCommands, os);
+        os.println();
+      } else {
+        if (args[1].equals("help")) {
+          printHelp("help", globalCommands.get("help"), os);
+        } else {
+          boolean valid = false;
+          CommandSpec cs = globalCommands.get(args[1]);
+          if (cs != null) {
+            os.println("This usage is valid except when changing logical plan");
+            printHelp(args[1], cs, os);
+            os.println();
+            valid = true;
+          }
+          cs = connectedCommands.get(args[1]);
+          if (cs != null) {
+            os.println("This usage is valid when connected to an app except when changing logical plan");
+            printHelp(args[1], cs, os);
+            os.println();
+            valid = true;
+          }
+          cs = logicalPlanChangeCommands.get(args[1]);
+          if (cs != null) {
+            os.println("This usage is only valid when changing logical plan (via begin-logical-plan-change)");
+            printHelp(args[1], cs, os);
+            os.println();
+            valid = true;
+          }
+          if (!valid) {
+            os.println("Help for \"" + args[1] + "\" does not exist.");
+          }
+        }
+      }
+      closeOutputPrintStream(os);
+    }
+
+  }
+
+  private class EchoCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      for (int i = 1; i < args.length; i++) {
+        if (i > 1) {
+          System.out.print(" ");
+        }
+        System.out.print(args[i]);
+      }
+      System.out.println();
+    }
+  }
+
+  private class ConnectCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      currentApp = getApplication(args[1]);
+      if (currentApp == null) {
+        throw new CliException("Streaming application with id " + args[1] + " is not found.");
+      }
+      LOG.debug("Selected {} with tracking url {}", currentApp.getApplicationId(), currentApp.getTrackingUrl());
+      getResource(StramWebServices.PATH_INFO, currentApp);
+      if (consolePresent) {
+        System.out.println("Connected to application " + currentApp.getApplicationId());
+      }
+    }
+
+  }
+
+  private class LaunchCommand implements Command
+  {
+    @Override
+    @SuppressWarnings("SleepWhileInLoop")
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      String[] newArgs = new String[args.length - 1];
+      System.arraycopy(args, 1, newArgs, 0, args.length - 1);
+      LaunchCommandLineInfo commandLineInfo = getLaunchCommandLineInfo(newArgs);
+
+      if (commandLineInfo.configFile != null) {
+        commandLineInfo.configFile = expandFileName(commandLineInfo.configFile, true);
+      }
+
+      // see if the given config file is a config package
+      ConfigPackage cp = null;
+      String requiredAppPackageName = null;
+      try {
+        cp = new ConfigPackage(new File(commandLineInfo.configFile));
+        requiredAppPackageName = cp.getAppPackageName();
+      } catch (Exception ex) {
+        // fall through, it's not a config package
+      }
+      try {
+        Configuration config;
+        String configFile = cp == null ? commandLineInfo.configFile : null;
+        try {
+          config = StramAppLauncher.getOverriddenConfig(StramClientUtils.addDTSiteResources(new Configuration()), configFile, commandLineInfo.overrideProperties);
+          if (commandLineInfo.libjars != null) {
+            commandLineInfo.libjars = expandCommaSeparatedFiles(commandLineInfo.libjars);
+            if (commandLineInfo.libjars != null) {
+              config.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, commandLineInfo.libjars);
+            }
+          }
+          if (commandLineInfo.files != null) {
+            commandLineInfo.files = expandCommaSeparatedFiles(commandLineInfo.files);
+            if (commandLineInfo.files != null) {
+              config.set(StramAppLauncher.FILES_CONF_KEY_NAME, commandLineInfo.files);
+            }
+          }
+          if (commandLineInfo.archives != null) {
+            commandLineInfo.archives = expandCommaSeparatedFiles(commandLineInfo.archives);
+            if (commandLineInfo.archives != null) {
+              config.set(StramAppLauncher.ARCHIVES_CONF_KEY_NAME, commandLineInfo.archives);
+            }
+          }
+          if (commandLineInfo.origAppId != null) {
+            config.set(StramAppLauncher.ORIGINAL_APP_ID, commandLineInfo.origAppId);
+          }
+          config.set(StramAppLauncher.QUEUE_NAME, commandLineInfo.queue != null ? commandLineInfo.queue : "default");
+        } catch (Exception ex) {
+          throw new CliException("Error opening the config XML file: " + configFile, ex);
+        }
+        StramAppLauncher submitApp;
+        AppFactory appFactory = null;
+        String matchString = null;
+        if (commandLineInfo.args.length == 0) {
+          if (commandLineInfo.origAppId == null) {
+            throw new CliException("Launch requires an APA or JAR file when not resuming a terminated application");
+          }
+          submitApp = new StramAppLauncher(fs, config);
+          appFactory = submitApp.new RecoveryAppFactory();
+        } else {
+          String fileName = expandFileName(commandLineInfo.args[0], true);
+          if (commandLineInfo.args.length >= 2) {
+            matchString = commandLineInfo.args[1];
+          }
+          if (fileName.endsWith(".json")) {
+            File file = new File(fileName);
+            submitApp = new StramAppLauncher(file.getName(), config);
+            appFactory = new StramAppLauncher.JsonFileAppFactory(file);
+            if (matchString != null) {
+              LOG.warn("Match string \"{}\" is ignored for launching applications specified in JSON", matchString);
+            }
+          } else if (fileName.endsWith(".properties")) {
+            File file = new File(fileName);
+            submitApp = new StramAppLauncher(file.getName(), config);
+            appFactory = new StramAppLauncher.PropertyFileAppFactory(file);
+            if (matchString != null) {
+              LOG.warn("Match string \"{}\" is ignored for launching applications specified in properties file", matchString);
+            }
+          } else {
+            // see if it's an app package
+            AppPackage ap = null;
+            try {
+              ap = newAppPackageInstance(new File(fileName));
+            } catch (Exception ex) {
+              // It's not an app package
+              if (requiredAppPackageName != null) {
+                throw new CliException("Config package requires an app package name of \"" + requiredAppPackageName + "\"");
+              }
+            }
+
+            if (ap != null) {
+              try {
+                if (!commandLineInfo.force) {
+                  checkPlatformCompatible(ap);
+                  checkConfigPackageCompatible(ap, cp);
+                }
+                launchAppPackage(ap, cp, commandLineInfo, reader);
+                return;
+              } finally {
+                IOUtils.closeQuietly(ap);
+              }
+            }
+            submitApp = getStramAppLauncher(fileName, config, commandLineInfo.ignorePom);
+          }
+        }
+        submitApp.loadDependencies();
+
+        if (commandLineInfo.origAppId != null) {
+          // ensure app is not running
+          ApplicationReport ar = null;
+          try {
+            ar = getApplication(commandLineInfo.origAppId);
+          } catch (Exception e) {
+            // application (no longer) in the RM history, does not prevent restart from state in DFS
+            LOG.debug("Cannot determine status of application {} {}", commandLineInfo.origAppId, ExceptionUtils.getMessage(e));
+          }
+          if (ar != null) {
+            if (ar.getFinalApplicationStatus() == FinalApplicationStatus.UNDEFINED) {
+              throw new CliException("Cannot relaunch non-terminated application: " + commandLineInfo.origAppId + " " + ar.getYarnApplicationState());
+            }
+            if (appFactory == null && matchString == null) {
+              // skip selection if we can match application name from previous run
+              List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, ar.getName(), commandLineInfo.exactMatch);
+              for (AppFactory af : matchingAppFactories) {
+                String appName = submitApp.getLogicalPlanConfiguration().getAppAlias(af.getName());
+                if (appName == null) {
+                  appName = af.getName();
+                }
+                // limit to exact match
+                if (appName.equals(ar.getName())) {
+                  appFactory = af;
+                  break;
+                }
+              }
+            }
+          }
+        }
+
+        if (appFactory == null && matchString != null) {
+          // attempt to interpret argument as property file - do we still need it?
+          try {
+            File file = new File(expandFileName(commandLineInfo.args[1], true));
+            if (file.exists()) {
+              if (commandLineInfo.args[1].endsWith(".properties")) {
+                appFactory = new StramAppLauncher.PropertyFileAppFactory(file);
+              } else if (commandLineInfo.args[1].endsWith(".json")) {
+                appFactory = new StramAppLauncher.JsonFileAppFactory(file);
+              }
+            }
+          } catch (Exception | NoClassDefFoundError ex) {
+            // ignore
+          }
+        }
+
+        if (appFactory == null) {
+          List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, matchString, commandLineInfo.exactMatch);
+          if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
+            throw new CliException("No applications matching \"" + matchString + "\" bundled in jar.");
+          } else if (matchingAppFactories.size() == 1) {
+            appFactory = matchingAppFactories.get(0);
+          } else if (matchingAppFactories.size() > 1) {
+
+            //Store the appNames sorted in alphabetical order and their position in matchingAppFactories list
+            TreeMap<String, Integer> appNamesInAlphabeticalOrder = new TreeMap<>();
+            // Display matching applications
+            for (int i = 0; i < matchingAppFactories.size(); i++) {
+              String appName = matchingAppFactories.get(i).getName();
+              String appAlias = submitApp.getLogicalPlanConfiguration().getAppAlias(appName);
+              if (appAlias != null) {
+                appName = appAlias;
+              }
+              appNamesInAlphabeticalOrder.put(appName, i);
+            }
+
+            //Create a mapping between the app display number and original index at matchingAppFactories
+            int index = 1;
+            HashMap<Integer, Integer> displayIndexToOriginalUnsortedIndexMap = new HashMap<>();
+            for (Map.Entry<String, Integer> entry : appNamesInAlphabeticalOrder.entrySet()) {
+              //Map display number of the app to original unsorted index
+              displayIndexToOriginalUnsortedIndexMap.put(index, entry.getValue());
+
+              //Display the app names
+              System.out.printf("%3d. %s\n", index++, entry.getKey());
+            }
+
+            // Exit if not in interactive mode
+            if (!consolePresent) {
+              throw new CliException("More than one application in jar file match '" + matchString + "'");
+            } else {
+
+              boolean useHistory = reader.isHistoryEnabled();
+              reader.setHistoryEnabled(false);
+              History previousHistory = reader.getHistory();
+              History dummyHistory = new MemoryHistory();
+              reader.setHistory(dummyHistory);
+              List<Completer> completers = new ArrayList<>(reader.getCompleters());
+              for (Completer c : completers) {
+                reader.removeCompleter(c);
+              }
+              reader.setHandleUserInterrupt(true);
+              String optionLine;
+              try {
+                optionLine = reader.readLine("Choose application: ");
+              } finally {
+                reader.setHandleUserInterrupt(false);
+                reader.setHistoryEnabled(useHistory);
+                reader.setHistory(previousHistory);
+                for (Completer c : completers) {
+                  reader.addCompleter(c);
+                }
+              }
+              try {
+                int option = Integer.parseInt(optionLine);
+                if (0 < option && option <= matchingAppFactories.size()) {
+                  int appIndex = displayIndexToOriginalUnsortedIndexMap.get(option);
+                  appFactory = matchingAppFactories.get(appIndex);
+                }
+              } catch (Exception ex) {
+                // ignore
+              }
+            }
+          }
+
+        }
+
+        if (appFactory != null) {
+          if (!commandLineInfo.localMode) {
+
+            // see whether there is an app with the same name and user name running
+            String appNameAttributeName = StreamingApplication.DT_PREFIX + Context.DAGContext.APPLICATION_NAME.getName();
+            String appName = config.get(appNameAttributeName, appFactory.getName());
+            ApplicationReport duplicateApp = StramClientUtils.getStartedAppInstanceByName(yarnClient, appName, UserGroupInformation.getLoginUser().getUserName(), null);
+            if (duplicateApp != null) {
+              throw new CliException("Application with the name \"" + duplicateApp.getName() + "\" already running under the current user \"" + duplicateApp.getUser() + "\". Please choose another name. You can change the name by setting " + appNameAttributeName);
+            }
+
+            // This is for suppressing System.out printouts from applications so that the user of CLI will not be confused by those printouts
+            PrintStream originalStream = suppressOutput();
+            ApplicationId appId = null;
+            try {
+              if (raw) {
+                PrintStream dummyStream = new PrintStream(new OutputStream()
+                {
+                  @Override
+                  public void write(int b)
+                  {
+                    // no-op
+                  }
+
+                });
+                System.setOut(dummyStream);
+              }
+              appId = submitApp.launchApp(appFactory);
+              currentApp = yarnClient.getApplicationReport(appId);
+            } finally {
+              restoreOutput(originalStream);
+            }
+            if (appId != null) {
+              printJson("{\"appId\": \"" + appId + "\"}");
+            }
+          } else {
+            submitApp.runLocal(appFactory);
+          }
+        } else {
+          System.err.println("No application specified.");
+        }
+
+      } finally {
+        IOUtils.closeQuietly(cp);
+      }
+    }
+
+  }
+
+  private class GetConfigParameterCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      PrintStream os = getOutputPrintStream();
+      if (args.length == 1) {
+        Map<String, String> sortedMap = new TreeMap<>();
+        for (Map.Entry<String, String> entry : conf) {
+          sortedMap.put(entry.getKey(), entry.getValue());
+        }
+        for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
+          os.println(entry.getKey() + "=" + entry.getValue());
+        }
+      } else {
+        String value = conf.get(args[1]);
+        if (value != null) {
+          os.println(value);
+        }
+      }
+      closeOutputPrintStream(os);
+    }
+
+  }
+
+  private class ShutdownAppCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      ApplicationReport[] apps;
+      if (args.length == 1) {
+        if (currentApp == null) {
+          throw new CliException("No application selected");
+        } else {
+          apps = new ApplicationReport[]{currentApp};
+        }
+      } else {
+        apps = new ApplicationReport[args.length - 1];
+        for (int i = 1; i < args.length; i++) {
+          apps[i - 1] = getApplication(args[i]);
+          if (apps[i - 1] == null) {
+            throw new CliException("Streaming application with id " + args[i] + " is not found.");
+          }
+        }
+      }
+
+      for (ApplicationReport app : apps) {
+        try {
+          JSONObject response = getResource(new StramAgent.StramUriSpec().path(StramWebServices.PATH_SHUTDOWN), app, new WebServicesClient.WebServicesHandler<JSONObject>()
+          {
+            @Override
+            public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
+            {
+              return webResource.accept(MediaType.APPLICATION_JSON).post(clazz, new JSONObject());
+            }
+
+          });
+          if (consolePresent) {
+            System.out.println("Shutdown requested: " + response);
+          }
+          currentApp = null;
+        } catch (Exception e) {
+          throw new CliException("Failed to request shutdown for appid " + app.getApplicationId().toString(), e);
+        }
+      }
+    }
+
+  }
+
+  private class ListAppsCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      try {
+        JSONArray jsonArray = new JSONArray();
+        List<ApplicationReport> appList = getApplicationList();
+        Collections.sort(appList, new Comparator<ApplicationReport>()
+        {
+          @Override
+          public int compare(ApplicationReport o1, ApplicationReport o2)
+          {
+            return o1.getApplicationId().getId() - o2.getApplicationId().getId();
+          }
+
+        });
+        int totalCnt = 0;
+        int runningCnt = 0;
+
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z");
+
+        for (ApplicationReport ar : appList) {
+          /*
+           * This is inefficient, but what the heck, if this can be passed through the command line, can anyone notice slowness.
+           */
+          JSONObject jsonObj = new JSONObject();
+          jsonObj.put("startTime", sdf.format(new java.util.Date(ar.getStartTime())));
+          jsonObj.put("id", ar.getApplicationId().getId());
+          jsonObj.put("name", ar.getName());
+          jsonObj.put("state", ar.getYarnApplicationState().name());
+          jsonObj.put("trackingUrl", ar.getTrackingUrl());
+          jsonObj.put("finalStatus", ar.getFinalApplicationStatus());
+
+          totalCnt++;
+          if (ar.getYarnApplicationState() == YarnApplicationState.RUNNING) {
+            runningCnt++;
+          }
+
+          if (args.length > 1) {
+            if (StringUtils.isNumeric(args[1])) {
+              if (jsonObj.getString("id").equals(args[1])) {
+                jsonArray.put(jsonObj);
+                break;
+              }
+            } else {
+              @SuppressWarnings("unchecked")
+              Iterator<String> keys = jsonObj.keys();
+              while (keys.hasNext()) {
+                if (jsonObj.get(keys.next()).toString().toLowerCase().contains(args[1].toLowerCase())) {
+                  jsonArray.put(jsonObj);
+                  break;
+                }
+              }
+            }
+          } else {
+            jsonArray.put(jsonObj);
+          }
+        }
+        printJson(jsonArray, "apps");
+        if (consolePresent) {
+          System.out.println(runningCnt + " active, total " + totalCnt + " applications.");
+        }
+      } catch (Exception ex) {
+        throw new CliException("Failed to retrieve application list", ex);
+      }
+    }
+
+  }
+
+  private class KillAppCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      if (args.length == 1) {
+        if (currentApp == null) {
+          throw new CliException("No application selected");
+        } else {
+          try {
+            yarnClient.killApplication(currentApp.getApplicationId());
+            currentApp = null;
+          } catch (YarnException e) {
+            throw new CliException("Failed to kill " + currentApp.getApplicationId(), e);
+          }
+        }
+        if (consolePresent) {
+          System.out.println("Kill app requested");
+        }
+        return;
+      }
+
+      ApplicationReport app = null;
+      int i = 0;
+      try {
+        while (++i < args.length) {
+          app = getApplication(args[i]);
+          if (app == null) {
+            throw new CliException("Streaming application with id " + args[i] + " is not found.");
+          }
+          yarnClient.killApplication(app.getApplicationId());
+          if (app == currentApp) {
+            currentApp = null;
+          }
+        }
+        if (consolePresent) {
+          System.out.println("Kill app requested");
+        }
+      } catch (YarnException e) {
+        throw new CliException("Failed to kill " + ((app == null || app.getApplicationId() == null) ? "unknown application" : app.getApplicationId()) + ". Aborting killing of any additional applications.", e);
+      } catch (NumberFormatException nfe) {
+        throw new CliException("Invalid application Id " + args[i], nfe);
+      }
+    }
+
+  }
+
+  private class AliasCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      if (args[1].equals(args[2])) {
+        throw new CliException("Alias to itself!");
+      }
+      aliases.put(args[1], args[2]);
+      if (consolePresent) {
+        System.out.println("Alias " + args[1] + " created.");
+      }
+      updateCompleter(reader);
+    }
+
+  }
+
+  private class SourceCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      processSourceFile(args[1], reader);
+      if (consolePresent) {
+        System.out.println("File " + args[1] + " sourced.");
+      }
+    }
+
+  }
+
+  private class ExitCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      if (topLevelHistory != null) {
+        try {
+          topLevelHistory.flush();
+        } catch (IOException ex) {
+          LOG.warn("Cannot flush command history");
+        }
+      }
+      if (changingLogicalPlanHistory != null) {
+        try {
+          changingLogicalPlanHistory.flush();
+        } catch (IOException ex) {
+          LOG.warn("Cannot flush command history");
+        }
+      }
+      System.exit(0);
+    }
+
+  }
+
+  private class ListContainersCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      JSONObject json = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS, currentApp);
+      if (args.length == 1) {
+        printJson(json);
+      } else {
+        Object containersObj = json.get("containers");
+        JSONArray containers;
+        if (containersObj instanceof JSONArray) {
+          containers = (JSONArray)containersObj;
+        } else {
+          containers = new JSONArray();
+          containers.put(containersObj);
+        }
+        if (containersObj == null) {
+          System.out.println("No containers found!");
+        } else {
+          JSONArray resultContainers = new JSONArray();
+          for (int o = containers.length(); o-- > 0; ) {
+            JSONObject container = containers.getJSONObject(o);
+            String id = container.getString("id");
+            if (id != null && !id.isEmpty()) {
+              for (int argc = args.length; argc-- > 1; ) {
+                String s1 = "0" + args[argc];
+                String s2 = "_" + args[argc];
+                if (id.equals(args[argc]) || id.endsWith(s1) || id.endsWith(s2)) {
+                  resultContainers.put(container);
+                }
+              }
+            }
+          }
+          printJson(resultContainers, "containers");
+        }
+      }
+    }
+
+  }
+
+  private class ListOperatorsCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      JSONObject json = getResource(StramWebServices.PATH_PHYSICAL_PLAN_OPERATORS, currentApp);
+
+      if (args.length > 1) {
+        String singleKey = "" + json.keys().next();
+        JSONArray matches = new JSONArray();
+        // filter operators
+        JSONArray arr;
+        Object obj = json.get(singleKey);
+        if (obj instanceof JSONArray) {
+          arr = (JSONArray)obj;
+        } else {
+          arr = new JSONArray();
+          arr.put(obj);
+        }
+        for (int i = 0; i < arr.length(); i++) {
+          JSONObject oper = arr.getJSONObject(i);
+          if (StringUtils.isNumeric(args[1])) {
+            if (oper.getString("id").equals(args[1])) {
+              matches.put(oper);
+              break;
+            }
+          } else {
+            @SuppressWarnings("unchecked")
+            Iterator<String> keys = oper.keys();
+            while (keys.hasNext()) {
+              if (oper.get(keys.next()).toString().toLowerCase().contains(args[1].toLowerCase())) {
+                matches.put(oper);
+                break;
+              }
+            }
+          }
+        }
+        json.put(singleKey, matches);
+      }
+
+      printJson(json);
+    }
+
+  }
+
+  private class ShowPhysicalPlanCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      try {
+        printJson(getResource(StramWebServices.PATH_PHYSICAL_PLAN, currentApp));
+      } catch (Exception e) {
+        throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
+      }
+    }
+
+  }
+
+  private class KillContainerCommand implements Command
+  {
+    @Override
+    public void execute(String[] args, ConsoleReader reader) throws Exception
+    {
+      for (int i = 1; i < args.length; i++) {
+        String containerLongId = getContainerLongId(args[i]);
+        if (containerLongId == null) {
+          throw new CliException("Container " + args[i] + " not found");
+        }
+        try {
+          StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
+          uriSpec = uriSpec.path(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS).path(URLEncoder.encode(containerLongId, "UTF-8")).path("kill");
+          JSONObject response = getResource(uriSpec, currentApp, new WebServicesClient.WebServicesHandler<JSONObject>()
+          {
+            @Override
+            public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
+            {
+              return webResource.accept(MediaType.APPLICATION_JSON).post(clazz, new JSONObject());
+            }
+
+          });

<TRUNCATED>


Mime
View raw message