ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alejan...@apache.org
Subject ambari git commit: AMBARI-12455. RU - Magician Script to correct data inconsistencies, allow retrying repo installation, force finalize to versions, etc (alejandro)
Date Mon, 10 Aug 2015 21:51:38 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 a359fc626 -> 42a44e4af


AMBARI-12455. RU - Magician Script to correct data inconsistencies, allow retrying repo installation, force finalize to versions, etc (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/42a44e4a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/42a44e4a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/42a44e4a

Branch: refs/heads/branch-2.1
Commit: 42a44e4af987fc4eeda948419723dc6655719a76
Parents: a359fc6
Author: Alejandro Fernandez <afernandez@hortonworks.com>
Authored: Mon Aug 10 14:51:28 2015 -0700
Committer: Alejandro Fernandez <afernandez@hortonworks.com>
Committed: Mon Aug 10 14:51:28 2015 -0700

----------------------------------------------------------------------
 contrib/ru_magician/README.md        |  42 ++
 contrib/ru_magician/lgpl_license.txt | 165 ++++++
 contrib/ru_magician/mit_license.txt  |  21 +
 contrib/ru_magician/ru_magician.py   | 908 ++++++++++++++++++++++++++++++
 pom.xml                              |   2 +
 5 files changed, 1138 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/42a44e4a/contrib/ru_magician/README.md
----------------------------------------------------------------------
diff --git a/contrib/ru_magician/README.md b/contrib/ru_magician/README.md
new file mode 100644
index 0000000..497c1e7
--- /dev/null
+++ b/contrib/ru_magician/README.md
@@ -0,0 +1,42 @@
+# Ambari Rolling Upgrade Magician
+
+
+**Script**: ru_magician.py
+
+## Licenses Used:
+* MIT License
+* GNU Lesser General Public License v1.3
+
+This script is provided as is with no guarantees.
+
+## Pre-reqs
+* Apache Ambari 2.0 or higher
+* MySQL or Postgres database, and appropriate python package to connect to the database
+
+## Tested with
+* Apache Ambari 2.0 and HDP 2.2
+* Apache Ambari 2.1 and HDP 2.2 and 2.3
+
+## Description:
+This script is meant to be used on clusters deployed with Ambari and using Ambari version 2.0.0 or higher, in order
+to identify any inconsistencies in the database related to the version of components for the HDP stack.
+The script will pinpoint problems, and ask the user to take correct action to update their database.
+As of this version, this script only supports MySQL and Postgres.
+
+It handles the following cases:
+
+1. On a newly installed cluster, ensures that there is at least one cluster version whose state is CURRENT. If not, will advise the user to restart services.
+
+2. If the user has Registered and Installed repositories, check that each one has a unique version and display name. Further, if any are stuck in an INSTALLING state, will let the user take three potential actions: leave as is, force to INSTALLED, force to INSTALL_FAILED (in order to retry).
+
+3. If the user has Registered and Installed repositories, and one cluster_version is already in an UPGRADING state, perhaps because hdp-select changed the symlinks and a component was restarted, or the user inadvertently started a manual upgrade, will allow the user to force it back to INSTALLED.
+
+4. If the user in the in the middle of an upgrade, and they want to force one of the versions are CURRENT, it will update all database records accordingly, and set the previously CURRENT version to INSTALLED.
+
+## Instructions to run:
+* Install Python 2.6 or higher
+* Ensure that /etc/ambari-server/conf/ambari.properties is accessible
+* Stop ambari-server process
+* If using MySQL database, will have to install MySQL database package for python. E.g., yum install MySQL-python
+* If using Postgres databse, will have to install Postgres database package for python. E.g., yum install python-psycopg2
+* From the Ambari Server host, run the file as, python ru_magician.py

http://git-wip-us.apache.org/repos/asf/ambari/blob/42a44e4a/contrib/ru_magician/lgpl_license.txt
----------------------------------------------------------------------
diff --git a/contrib/ru_magician/lgpl_license.txt b/contrib/ru_magician/lgpl_license.txt
new file mode 100644
index 0000000..65c5ca8
--- /dev/null
+++ b/contrib/ru_magician/lgpl_license.txt
@@ -0,0 +1,165 @@
+                   GNU LESSER GENERAL PUBLIC LICENSE
+                       Version 3, 29 June 2007
+
+ Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+
+  This version of the GNU Lesser General Public License incorporates
+the terms and conditions of version 3 of the GNU General Public
+License, supplemented by the additional permissions listed below.
+
+  0. Additional Definitions.
+
+  As used herein, "this License" refers to version 3 of the GNU Lesser
+General Public License, and the "GNU GPL" refers to version 3 of the GNU
+General Public License.
+
+  "The Library" refers to a covered work governed by this License,
+other than an Application or a Combined Work as defined below.
+
+  An "Application" is any work that makes use of an interface provided
+by the Library, but which is not otherwise based on the Library.
+Defining a subclass of a class defined by the Library is deemed a mode
+of using an interface provided by the Library.
+
+  A "Combined Work" is a work produced by combining or linking an
+Application with the Library.  The particular version of the Library
+with which the Combined Work was made is also called the "Linked
+Version".
+
+  The "Minimal Corresponding Source" for a Combined Work means the
+Corresponding Source for the Combined Work, excluding any source code
+for portions of the Combined Work that, considered in isolation, are
+based on the Application, and not on the Linked Version.
+
+  The "Corresponding Application Code" for a Combined Work means the
+object code and/or source code for the Application, including any data
+and utility programs needed for reproducing the Combined Work from the
+Application, but excluding the System Libraries of the Combined Work.
+
+  1. Exception to Section 3 of the GNU GPL.
+
+  You may convey a covered work under sections 3 and 4 of this License
+without being bound by section 3 of the GNU GPL.
+
+  2. Conveying Modified Versions.
+
+  If you modify a copy of the Library, and, in your modifications, a
+facility refers to a function or data to be supplied by an Application
+that uses the facility (other than as an argument passed when the
+facility is invoked), then you may convey a copy of the modified
+version:
+
+   a) under this License, provided that you make a good faith effort to
+   ensure that, in the event an Application does not supply the
+   function or data, the facility still operates, and performs
+   whatever part of its purpose remains meaningful, or
+
+   b) under the GNU GPL, with none of the additional permissions of
+   this License applicable to that copy.
+
+  3. Object Code Incorporating Material from Library Header Files.
+
+  The object code form of an Application may incorporate material from
+a header file that is part of the Library.  You may convey such object
+code under terms of your choice, provided that, if the incorporated
+material is not limited to numerical parameters, data structure
+layouts and accessors, or small macros, inline functions and templates
+(ten or fewer lines in length), you do both of the following:
+
+   a) Give prominent notice with each copy of the object code that the
+   Library is used in it and that the Library and its use are
+   covered by this License.
+
+   b) Accompany the object code with a copy of the GNU GPL and this license
+   document.
+
+  4. Combined Works.
+
+  You may convey a Combined Work under terms of your choice that,
+taken together, effectively do not restrict modification of the
+portions of the Library contained in the Combined Work and reverse
+engineering for debugging such modifications, if you also do each of
+the following:
+
+   a) Give prominent notice with each copy of the Combined Work that
+   the Library is used in it and that the Library and its use are
+   covered by this License.
+
+   b) Accompany the Combined Work with a copy of the GNU GPL and this license
+   document.
+
+   c) For a Combined Work that displays copyright notices during
+   execution, include the copyright notice for the Library among
+   these notices, as well as a reference directing the user to the
+   copies of the GNU GPL and this license document.
+
+   d) Do one of the following:
+
+       0) Convey the Minimal Corresponding Source under the terms of this
+       License, and the Corresponding Application Code in a form
+       suitable for, and under terms that permit, the user to
+       recombine or relink the Application with a modified version of
+       the Linked Version to produce a modified Combined Work, in the
+       manner specified by section 6 of the GNU GPL for conveying
+       Corresponding Source.
+
+       1) Use a suitable shared library mechanism for linking with the
+       Library.  A suitable mechanism is one that (a) uses at run time
+       a copy of the Library already present on the user's computer
+       system, and (b) will operate properly with a modified version
+       of the Library that is interface-compatible with the Linked
+       Version.
+
+   e) Provide Installation Information, but only if you would otherwise
+   be required to provide such information under section 6 of the
+   GNU GPL, and only to the extent that such information is
+   necessary to install and execute a modified version of the
+   Combined Work produced by recombining or relinking the
+   Application with a modified version of the Linked Version. (If
+   you use option 4d0, the Installation Information must accompany
+   the Minimal Corresponding Source and Corresponding Application
+   Code. If you use option 4d1, you must provide the Installation
+   Information in the manner specified by section 6 of the GNU GPL
+   for conveying Corresponding Source.)
+
+  5. Combined Libraries.
+
+  You may place library facilities that are a work based on the
+Library side by side in a single library together with other library
+facilities that are not Applications and are not covered by this
+License, and convey such a combined library under terms of your
+choice, if you do both of the following:
+
+   a) Accompany the combined library with a copy of the same work based
+   on the Library, uncombined with any other library facilities,
+   conveyed under the terms of this License.
+
+   b) Give prominent notice with the combined library that part of it
+   is a work based on the Library, and explaining where to find the
+   accompanying uncombined form of the same work.
+
+  6. Revised Versions of the GNU Lesser General Public License.
+
+  The Free Software Foundation may publish revised and/or new versions
+of the GNU Lesser General Public License from time to time. Such new
+versions will be similar in spirit to the present version, but may
+differ in detail to address new problems or concerns.
+
+  Each version is given a distinguishing version number. If the
+Library as you received it specifies that a certain numbered version
+of the GNU Lesser General Public License "or any later version"
+applies to it, you have the option of following the terms and
+conditions either of that published version or of any later version
+published by the Free Software Foundation. If the Library as you
+received it does not specify a version number of the GNU Lesser
+General Public License, you may choose any version of the GNU Lesser
+General Public License ever published by the Free Software Foundation.
+
+  If the Library as you received it specifies that a proxy can decide
+whether future versions of the GNU Lesser General Public License shall
+apply, that proxy's public statement of acceptance of any version is
+permanent authorization for you to choose that version for the
+Library.

http://git-wip-us.apache.org/repos/asf/ambari/blob/42a44e4a/contrib/ru_magician/mit_license.txt
----------------------------------------------------------------------
diff --git a/contrib/ru_magician/mit_license.txt b/contrib/ru_magician/mit_license.txt
new file mode 100644
index 0000000..16c0d63
--- /dev/null
+++ b/contrib/ru_magician/mit_license.txt
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2015, Apache Ambari
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.

http://git-wip-us.apache.org/repos/asf/ambari/blob/42a44e4a/contrib/ru_magician/ru_magician.py
----------------------------------------------------------------------
diff --git a/contrib/ru_magician/ru_magician.py b/contrib/ru_magician/ru_magician.py
new file mode 100644
index 0000000..0b1c31a
--- /dev/null
+++ b/contrib/ru_magician/ru_magician.py
@@ -0,0 +1,908 @@
+#!/usr/bin/env python
+
+'''
+Copyright (C)  2015, Apache Ambari
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+MIT License
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software
+and associated documentation files (the "Software"), to deal in the Software without restriction,
+including without limitation the rights to use, copy, modify, merge, publish, distribute,
+sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or
+substantial portions of the Software.
+
+GNU Lesser General Public License v1.3
+Permission is granted to copy, distribute and/or modify this document
+under the terms of the GNU Free Documentation License, Version 1.3
+or any later version published by the Free Software Foundation;
+with no Invariant Sections, no Front-Cover Texts, and no Back-Cover Texts.
+A copy of the license is included in the section entitled "GNU Free Documentation License".
+'''
+
+'''
+This script is provided as is with no guarantees.
+It is meant to be used on clusters deployed with Ambari and using Ambari version 2.0.0 or higher, in order
+to identify any inconsistencies in the database related to the version of components for the HDP stack.
+The script will pinpoint problems, and ask the user to take correct action to update their database.
+As of this version, this script only supports MySQL and Postgres.
+
+Further, this script must be ran from the host with Ambari-Server.
+'''
+
+# System imports
+import sys
+import os
+import logging
+import signal       # used to handle SIGINT and SIGTERM
+import subprocess   # used to check if ambari-server is running
+import re           # used to check if ambari-server is running when running regex on output
+
+
+from optparse import OptionParser
+
+Logger = logging.getLogger()
+
+AMBARI_PROPERTIES_LOCATION = "/etc/ambari-server/conf/ambari.properties"
+
+MIN_AMBARI_VERSION = "2.0.0"
+
+class DB_TYPE:
+  MYSQL = "MYSQL"
+  POSTGRES = "POSTGRES"
+
+class RUMagician:
+  """
+  Rolling Upgrade Magician analyzes the database to find and correct any issues.
+  It is a terminal-driven application, that prompts the user to select options.
+  """
+
+  def __init__(self, argv):
+    parser = OptionParser()
+    parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False)
+
+    (self.options, self.args) = parser.parse_args(argv)
+
+    # Log to stdout
+    logging_level = logging.DEBUG if self.options.verbose else logging.INFO
+    Logger.setLevel(logging_level)
+    ch = logging.StreamHandler(sys.stdout)
+    ch.setLevel(logging_level)
+    formatter = logging.Formatter('%(levelname)s - %(message)s')
+    ch.setFormatter(formatter)
+    Logger.addHandler(ch)
+
+    # Handle terminations gracefully
+    signal.signal(signal.SIGTERM, self.terminate)
+    signal.signal(signal.SIGINT, self.terminate)
+
+    self.print_usage()
+    self.configure()
+
+    if not self.check_ambari_server_process_down():
+      Logger.info("Ambari Server cannot be running while we make database updates. Please call \"ambari-server stop\" and try running this script again.")
+      self.terminate()
+
+    stage = None
+    while stage is None:
+      question = "What stage of Stack Upgrade are you in? Enter a number.\n" \
+                 "1) Just upgraded to Ambari {0} or higher, and do not see your stack version in the Versions page.\n".format(MIN_AMBARI_VERSION) + \
+                 "2) Registered a Repository and Installed Packages.\n" \
+                 "3) Performed an automated/manual upgrade and want to force it to the version I want.\n"
+
+      stage = self.ask_question(question, numeric=True)
+      try:
+        if stage:
+          stage = int(stage)
+      except ValueError, e:
+        stage = None
+
+      if stage == 1:
+        Logger.info("Selected {0}. Will verify that exactly one version exists and that it is marked as CURRENT.".format(stage))
+        self.check_exactly_one_current_version()
+      elif stage == 2:
+        Logger.info("Selected {0}. Will check that all Repository Versions are consistent.".format(stage))
+        self.check_repo_versions()
+        self.check_installation()
+      elif stage == 3:
+        Logger.info("Selected {0}. Will confirm that the cluster is in a good shape to finalize.".format(stage))
+        self.finalize()
+      else:
+        Logger.error("Invalid option \"{0}\", please try again.".format(stage))
+        stage = None
+
+  def ask_question(self, question, numeric=False):
+    """
+    :param question: Question to prompt.
+    :param numeric: If True, will cast to an integer.
+    :return: Returns the answer to the question, or None if not a complete string or couldn't cast to an integer.
+    If the user enters "q" at any time, will quit.
+    """
+    answer = raw_input(question)
+    if answer and answer != "":
+      answer = answer.strip()
+      if answer.lower() == "q":
+        Logger.info("Bye bye...")
+        self.terminate()  # this will close any open database connections
+
+    if answer == "":
+      answer = None
+
+    if numeric and answer is not None:
+      try:
+        answer = int(answer)
+      except Exception:
+        answer = None
+    return answer
+
+  def print_license(self):
+    # LGPL License header
+    license = "Permission is granted to copy, distribute and/or modify this document\n" \
+              "under the terms of the GNU Free Documentation License, Version 1.3\n" \
+              "or any later version published by the Free Software Foundation;\n" \
+              "with no Invariant Sections, no Front-Cover Texts, and no Back-Cover Texts.\n" \
+              "A copy of the license is included in the section entitled \"GNU Free Documentation License\".\n\n"
+    print(license)
+
+  def print_usage(self):
+    self.print_license()
+
+    msg = "\n*********************************************************************\n" \
+          "This script tries to find inconsistencies in your cluster\n" \
+          "while registering and installing versions.\n" \
+          "It will try to fix any problems,\n" \
+          "and assumes that you have Ambari {0} or higher,\n".format(MIN_AMBARI_VERSION) + \
+          "and are trying to perform either a manual or automatic stack upgrade.\n" \
+          "IMPORTANT, this script must be ran from the host with Ambari Server.\n" \
+          "Further, please take a database backup before continuing.\n" \
+          "To exit, enter \"q\"\n" \
+          "*********************************************************************\n"
+    Logger.info(msg)
+
+  def terminate(self, signum=None, stack=None):
+    """
+    Exit gracefully, closing any option file handles or connections.
+    It is important to use print statements instead of Logging statements in case that the logger is not yet
+    initialized.
+    :param signum: Usually SIGTERM, SIGTINT, or None (if user entered "q").
+    :param stack: Stack trace
+    """
+    if signum:
+      print("Caught termination signal {0}. Will exit gracefully.".format(signum))
+    if hasattr(self, "cursor") or hasattr(self, "conn"):
+      try:
+        print("Will try to close database connection.")
+        if hasattr(self, "cursor") and self.cursor:
+          self.cursor.close()
+        if hasattr(self, "conn") and self.conn:
+          self.conn.close()
+        print("Closed database connection successfully.")
+      except Exception, e:
+        print("Unable to close database connection. Error: {0}\n".format(e.message))
+    sys.exit(0)
+
+  def check_ambari_server_process_down(self):
+    """
+    Before running any DB commands, ensure that Ambari Server is not running.
+    :return: Return True if ambari-server is not running, otherwise, False.
+    """
+    process_name = "ambari-server"
+    output = self.__find_process(process_name)
+    return re.search(process_name, output) is None
+
+  def __find_process(self, process_name):
+    ps = subprocess.Popen("ps -ef | grep {0} | grep -v grep".format(process_name), shell=True, stdout=subprocess.PIPE)
+    output = ps.stdout.read()
+    #Logger.debug("Checking if process {0} is running. Output: {1}.\n".format(process_name, output))
+    ps.stdout.close()
+    ps.wait()
+    return output
+
+  def configure(self):
+    """
+    Read configurations and ensure can connect to database.
+    """
+    # Defaults
+    self.db_type = DB_TYPE.POSTGRES
+    self.db_name = "ambari"
+    self.db_user = "ambari"
+    self.db_password = "bigdata"
+    self.db_host = "localhost"
+    self.db_url = None
+
+    if os.path.exists(AMBARI_PROPERTIES_LOCATION):
+      self.ambari_props = self.read_conf_file(AMBARI_PROPERTIES_LOCATION)
+
+      if "server.jdbc.database" in self.ambari_props:
+        self.db_type = self.ambari_props["server.jdbc.database"].upper()
+      if "server.jdbc.database_name" in self.ambari_props:
+        self.db_name = self.ambari_props["server.jdbc.database_name"]
+      if "server.jdbc.user.name" in self.ambari_props:
+        self.db_user = self.ambari_props["server.jdbc.user.name"]
+      if "server.jdbc.user.passwd" in self.ambari_props:
+        self.db_password = self.read_file(self.ambari_props["server.jdbc.user.passwd"])
+      if "server.jdbc.hostname" in self.ambari_props:
+        self.db_host = self.ambari_props["server.jdbc.hostname"]
+      if "server.jdbc.url" in self.ambari_props:
+        self.db_url = self.ambari_props["server.jdbc.url"]
+
+      Logger.info("Using database type: {0}, name: {1}, host: {2}".format(self.db_type, self.db_name, self.db_host))
+      connection_string = "dbname='{0}' user='{1}' host='{2}' password='{3}'".format(self.db_name, self.db_user, self.db_host, self.db_password)
+
+      if self.db_type == DB_TYPE.POSTGRES:
+        try:
+          import psycopg2     # covered by GNU Lesser General Public License
+        except Exception, e:
+          Logger.error("Need to install python-psycopg2 package for Postgres DB. E.g., yum install python-psycopg2\n")
+          self.terminate()
+      elif self.db_type == DB_TYPE.MYSQL:
+        try:
+          import pymysql      # covered by MIT License
+        except Exception, e:
+          Logger.error("Need to install PyMySQL package for Python. E.g., yum install python-setuptools && easy_install pip && pip install PyMySQL\n")
+          self.terminate()
+      else:
+        Logger.error("Unknown database type: {0}.".format(self.db_type))
+        self.terminate()
+
+      self.conn = None
+      self.cursor = None
+      try:
+        Logger.debug("Initializing database connection and cursor.")
+        if self.db_type == DB_TYPE.POSTGRES:
+          self.conn = psycopg2.connect(connection_string)
+          self.cursor = self.conn.cursor()
+        elif self.db_type == DB_TYPE.MYSQL:
+          self.conn = pymysql.connect(self.db_host, self.db_user, self.db_password, self.db_name)
+          self.cursor = self.conn.cursor()
+
+        Logger.debug("Created database connection and cursor.")
+        self.cursor.execute("SELECT metainfo_key, metainfo_value FROM metainfo WHERE metainfo_key='version';")
+        rows = self.cursor.fetchall()
+        if rows and len(rows) == 1:
+          self.ambari_version = rows[0][1]
+          Logger.info("Connected to database!!! Ambari version is {0}\n".format(self.ambari_version))
+
+          # Must be Ambari 2.0.0 or higher
+          if self.compare_versions(self.ambari_version, MIN_AMBARI_VERSION) < 0:
+            Logger.error("Must be running Ambari Version {0} or higher.\n".format(MIN_AMBARI_VERSION))
+            self.terminate()
+        else:
+          Logger.error("Unable to determine Ambari version.")
+          self.terminate()
+
+        self.set_cluster()
+      except Exception, e:
+        Logger.error("I am unable to connect to the database. Error: {0}\n".format(e))
+        self.terminate()
+    else:
+      raise Exception("Could not find file {0}".format(AMBARI_PROPERTIES_LOCATION))
+
+  def read_conf_file(self, file_path):
+    """
+    Parse the configuration file, and return a dictionary of key, value pairs.
+    Ignore any lines that begin with #
+    :param file_path: Properties file to parse.
+    :return: Dictionary with key, value pairs.
+    """
+    ambari_props = {}
+    if os.path.exists(file_path):
+      with open(file_path, "r") as f:
+        lines = f.readlines()
+        if lines:
+          Logger.debug("Reading file {0}, has {1} lines.".format(file_path, len(lines)))
+          for l in lines:
+            l = l.strip()
+            if l.startswith("#"):
+              continue
+            parts = l.split("=")
+            if len(parts) >= 2:
+              prop = parts[0]
+              value = "".join(parts[1:])
+              ambari_props[prop] = value
+    return ambari_props
+
+  def read_file(self, file_path):
+    """
+    :param file_path: File to read. Typically the ambari database password file.
+    :return: Return the contents of the file
+    """
+    if os.path.exists(file_path):
+      with open(file_path, "r") as f:
+        lines = f.readlines()
+        return "\n".join(lines)
+    return None
+
+  def compare_versions(self, version1, version2):
+    """
+    Used to compare  Ambari Versions.
+    E.g., Ambari version 2.0.1 vs 2.1.1,
+    :param version1: First parameter for version
+    :param version2: Second parameter for version
+    :return: Returns -1 if version1 is before version2, 0 if they are equal, and 1 if version1 is after version2
+    """
+    max_segments = max(len(version1.split(".")), len(version2.split(".")))
+    return cmp(self.__normalize_version(version1, desired_segments=max_segments), self.__normalize_version(version2, desired_segments=max_segments))
+
+  def __normalize_version(self, v, desired_segments=0):
+    """
+    :param v: Input string of the form "#.#.#" or "#.#.#.#"
+    :param desired_segments: If greater than 0, and if v has fewer segments this parameter, will pad v with segments
+    containing "0" until the desired segments is reached.
+    :return: Returns a list of integers representing the segments of the version
+    """
+    v_list = v.split(".")
+    if desired_segments > 0 and len(v_list) < desired_segments:
+      v_list = v_list + ((desired_segments - len(v_list)) * ["0", ])
+    return [int(x) for x in v_list]
+
+  def set_cluster(self):
+    self.cluster_id = None
+    self.cluster_name = None
+    try:
+      query = "SELECT cluster_id, cluster_name FROM clusters ORDER BY cluster_name;"
+      self.cursor.execute(query)
+      rows = self.cursor.fetchall()
+      if rows:
+        if len(rows) == 1:
+          if len(rows[0]) == 2:
+            self.cluster_id = int(rows[0][0])
+            self.cluster_name = rows[0][1]
+        elif len(rows) > 1:
+          # Found multiple clusters
+          question = "We found multiple cluster names, which one should we use?\n"
+          for i in range(0, len(rows)):
+            question += "{0}) {1}\n".format(i+1, rows[i][1])
+
+          answer = None
+          while answer is None:
+            answer = self.ask_question(question, numeric=True)
+            if answer > 0 and answer <= len(rows):
+              self.cluster_id = int(rows[answer- 1][0])
+              self.cluster_name = rows[answer - 1][1]
+            else:
+              Logger.error("Invalid option \"{0}\", please try again.\n".format(answer))
+              answer = None
+        pass
+
+        if self.cluster_name is None:
+          Logger.error("Unable to determine the cluster name.\n")
+          self.terminate()
+        else:
+          Logger.info("Selected cluster name: {0}\n".format(self.cluster_name))
+      else:
+        Logger.error("Unable to get cluster from query: {0}\n".format(query))
+        self.terminate()
+    except Exception, e:
+      Logger.error("Caught an exception. Error: {0}\n".format(e.message))
+      self.terminate()
+
+  def check_exactly_one_current_version(self):
+    """
+    If there are no cluster_version records, or host_version records, the user will have to restart at least one component
+    that can advertise a version. Ideally, they need to restart all services.
+
+    If there is exactly one cluster_version, and every host_version record corresponds to the same repo_version,
+    then need to ensure that all of these entities have a state of CURRENT.
+    If not, prompt user if they want to change all to CURRENT.
+    """
+    expected_state = "CURRENT"
+
+    query = "SELECT COUNT(*) FROM cluster_version;"
+    self.cursor.execute(query)
+    result = self.cursor.fetchone()
+    if result is None or len(result) != 1:
+      Logger.error("Unable to run query: {0}".format(query))
+      return
+
+    count = result[0]
+    if count == 0:
+      msg = "There are no cluster_versions. Start ambari-server, and then perform a Restart on one of the services.\n" + \
+        "Then navigate to the \"Stacks and Versions > Versions\" page and ensure you can see the stack version.\n" + \
+        "Next, restart all services, one-by-one, so that Ambari knows what version each component is running."
+      Logger.warning(msg)
+    elif count == 1:
+      query = "SELECT rv.repo_version_id, rv.version, cv.state FROM cluster_version cv JOIN repo_version rv ON cv.repo_version_id = rv.repo_version_id;"
+      self.cursor.execute(query)
+      result = self.cursor.fetchone()
+
+      repo_version_id = None
+      repo_version = None
+      cluster_version_state = None
+
+      if result and len(result) == 3:
+        repo_version_id = result[0]
+        repo_version = result[1]
+        cluster_version_state = result[2]
+
+      if repo_version_id and repo_version and cluster_version_state:
+        if cluster_version_state.upper() == expected_state:
+          Logger.info("Everything looks correct. Cluster Version {0} has a state of {1}.".format(repo_version, cluster_version_state))
+          self.check_all_hosts_current(repo_version_id, repo_version)
+        else:
+          Logger.error("Cluster Version {0} should have a state of {1} but is {2}. Make sure to restart all of the Services.".format(repo_version, expected_state, cluster_version_state))
+          # TODO, can we force it to CURRENT? And all Host Versions too?
+      else:
+        Logger.error("Unable to run query: {0}".format(query))
+    elif count > 1:
+      # Ensure at least one Cluster Version is CURRENT
+      Logger.info("Found multiple Cluster versions, checking that exactly one is {0}.".format(expected_state))
+      query = "SELECT rv.repo_version_id, rv.version, cv.state FROM cluster_version cv JOIN repo_version rv ON cv.repo_version_id = rv.repo_version_id WHERE cv.state = '{0}';".format(expected_state)
+      self.cursor.execute(query)
+      rows = self.cursor.fetchall()
+      if rows:
+        if len(rows) == 1:
+          Logger.info("Good news; Cluster Version {0} has a state of {1}.".format(rows[0][1], expected_state))
+          self.check_all_hosts_current(rows[0][0], rows[0][1])
+        elif len(rows) > 1:
+          # Take the repo_version's version column
+          repo_versions = [row[1] for row in rows if len(row) == 3]
+          Logger.error("Found multiple cluster versions with a state of {0}, but only one should be {0}.\n" \
+                       "Will need to fix this manually, please contact Support. Cluster Versions found: {1}".format(expected_state, ", ".join(repo_versions)))
+      else:
+        Logger.error("Unable to run query: {0}\n".format(query))
+    pass
+
+  def check_all_hosts_current(self, repo_version_id, version_name):
+    """
+    Ensure that all of the hosts in the cluster have a state of CURRENT for the host_version that corresponds to the id.
+    :param repo_version_id: repo_version table's repo_version_id column
+    :param version_name: repo_version table's version column
+    """
+    if self.compare_versions(self.ambari_version, "2.1.0") < 0:
+      query1 = "SELECT chm.host_name from ClusterHostMapping chm JOIN clusters c ON c.cluster_name = '{0}';".format(self.cluster_name)
+    else:
+      query1 = "SELECT h.host_name from ClusterHostMapping chm JOIN clusters c ON c.cluster_name = '{0}' JOIN hosts h ON chm.host_id = h.host_id;".format(self.cluster_name)
+
+    if self.compare_versions(self.ambari_version, "2.1.0") < 0:
+      query2 = "SELECT hv.host_name, hv.state FROM host_version hv WHERE hv.repo_version_id = {0};".format(repo_version_id)
+    else:
+      query2 = "SELECT h.host_name, hv.state FROM hosts h JOIN host_version hv ON h.host_id = hv.host_id WHERE hv.repo_version_id = {0};".format(repo_version_id)
+
+    # All cluster hosts
+    host_names = set()
+    self.cursor.execute(query1)
+    rows = self.cursor.fetchall()
+    if self.options.verbose:
+      Logger.debug(query1 + "\n")
+    if rows and len(rows) > 0:
+      host_names = set([row[0] for row in rows if len(row) == 1])
+      Logger.debug("Hosts: {0}".format(", ".join(host_names)))
+
+    host_name_to_state = {} # keys should be a subset of host_names
+    hosts_with_repo_version_state_not_in_current = set()
+    self.cursor.execute(query2 + "\n")
+    rows = self.cursor.fetchall()
+    Logger.debug(query2)
+    if rows and len(rows) > 0:
+      for row in rows:
+        if len(row) == 2:
+          host_name = row[0]
+          state = row[1]
+          host_name_to_state[host_name] = state
+          if state.upper() != "CURRENT":
+            hosts_with_repo_version_state_not_in_current.add(host_name)
+
+    host_names_with_version = set(host_name_to_state.keys())
+    host_names_without_version = host_names - host_names_with_version
+
+    if len(host_names) > 0:
+      if len(host_names_without_version) > 0:
+        Logger.error("{0} host(s) do not have a Host Version for Repo Version {1}.\n" \
+                     "Host(s):\n{2}\n".
+                     format(len(host_names_without_version), version_name, ", ".join(host_names_without_version)))
+
+      if len(hosts_with_repo_version_state_not_in_current) > 0:
+        Logger.error("{0} host(s) have a Host Version for Repo Version {1} but the state is not CURRENT.\n" \
+                     "Host(s):\n{2}\n".
+                     format(len(hosts_with_repo_version_state_not_in_current), version_name, ", ".join(hosts_with_repo_version_state_not_in_current)))
+
+      if len(host_names_without_version) == 0 and len(hosts_with_repo_version_state_not_in_current) == 0:
+        Logger.info("Found {0} host(s) in the cluster, and all have a Host Version of CURRENT for " \
+                    "Repo Version {1}. Things look good.\n".format(len(host_names), version_name))
+      else:
+        Logger.error("Make sure that all of these hosts are heartbeating, that they have the packages installed, the\n" \
+          "hdp-select symlinks are correct, and that the services on these hosts have been restarated.\n")
+    pass
+
+  def check_repo_versions(self):
+    """
+    Check that all display names are unique, that all versions are unique.
+    """
+    query = "SELECT version, display_name FROM repo_version;"
+    self.cursor.execute(query)
+    rows = self.cursor.fetchall()
+    if rows is None:
+      Logger.warning("Did not find any Repo Versions, nothing to do. Query: {0}".format(query))
+      return
+
+    if len(rows) == 0:
+      Logger.info("There are no Repo Versions, nothing to do.")
+    elif len(rows) > 0:
+      # Dictionary from the key to the number of occurrences
+      version_name_to_count = {}
+      display_name_to_count = {}
+      for row in rows:
+        if len(row) == 2:
+          version = row[0]
+          display_name = row[1]
+
+          if version in version_name_to_count:
+            version_name_to_count[version] += 1
+          else:
+            version_name_to_count[version] = 1
+
+          if display_name in display_name_to_count:
+            display_name_to_count[display_name] += 1
+          else:
+            display_name_to_count[display_name] = 1
+
+      # If multiple occurrences are found, report a problem.
+      problematic_versions = set()
+      for k, v in version_name_to_count.iteritems():
+        if v > 1:
+          problematic_versions.add(k)
+
+      problematic_display_names = set()
+      for k, v in display_name_to_count.iteritems():
+        if v > 1:
+          problematic_display_names.add(k)
+
+      found_error = False
+      if len(problematic_versions) > 0:
+        found_error = True
+        Logger.error("The following version(s) exist multiple times in the Repo Versions: {0}".format(", ".join(problematic_versions)))
+      if len(problematic_display_names) > 0:
+        found_error = True
+        Logger.error("The following display name(s) exist multiple times in the Repo Versions: {0}".format(", ".join(problematic_display_names)))
+
+      if found_error:
+        Logger.error("You will have to correct this error manually by ensuring uniqueness.")
+      else:
+        Logger.info("Looks good; all of the Repo Versions have unique display names and versions.")
+    pass
+
+  def check_installation(self):
+    """
+    If any Cluster Versions are stuck in INSTALLING, ask the user if they want to
+    retry (transition to INSTALL_FAILED), or force it (transition to INSTALLED).
+    """
+    query = "SELECT rv.repo_version_id, rv.version, rv.display_name, cv.state FROM cluster_version cv JOIN repo_version rv ON cv.repo_version_id = rv.repo_version_id ORDER BY rv.repo_version_id;"
+    self.cursor.execute(query)
+    rows = self.cursor.fetchall()
+    if rows is None or len(rows) == 0:
+      Logger.warning("Did not find any Cluster Versions, nothing to do. Query: {0}".format(query))
+      return
+
+    for row in rows:
+      if len(row) == 4:
+        repo_version_id = row[0]
+        version = row[1]
+        display_name = row[2]
+        state = row[3]
+
+        if state.upper() == "INSTALLING":
+          question = "Repo Version {0} ({1}) is stuck in INSTALLING. ".format(display_name, version) + \
+          "Do you want to,\n" \
+          "1) Leave as is (DEFAULT)\n" \
+          "2) Retry installation\n" \
+          "3) Force it to Installed (WARNING: Only if you are certain you have installed the bits already!!!)\n"
+
+          answer = self.ask_question(question, numeric=True)
+          desired_state = None
+          if answer is None or answer == 1:
+            Logger.info("  Ignoring, moving on...")
+          elif answer == 2:
+            Logger.info("  Retrying installation")
+            desired_state = "INSTALL_FAILED"
+          elif answer == 3:
+            Logger.info("  Forcing to INSTALLED")
+            desired_state = "INSTALLED"
+
+          if desired_state is not None:
+            statement1 = "UPDATE cluster_version SET state = '{0}' WHERE repo_version_id={1};".format(desired_state, repo_version_id)
+            statement2 = "UPDATE host_version SET state = '{0}' WHERE repo_version_id={1};".format(desired_state, repo_version_id)
+
+            Logger.info("Will run update statements:\n{0}\n{1}\n".format(statement1, statement2))
+            self.cursor.execute(statement1)
+            self.cursor.execute(statement2)
+            self.conn.commit()
+        elif state.upper() in ["UPGRADING", "UPGRADED"]:
+          self.__handle_cluster_version_in_upgrading_or_upgraded(repo_version_id, version, display_name, state)
+    pass
+
+  def __handle_cluster_version_in_upgrading_or_upgraded(self, repo_version_id, version, display_name, state):
+    """
+    In some cases, installing hdp-select can flip the symlinks, so that restarting components will actually
+    cause them to start on the newer version without the user's knowledge.
+    In this case, the version in UPGRADING/UPGRADED has not been used in an upgrade yet, so we must revert
+    to a good known state if the user intends on making an automated RU.
+    Before doing so, the user will have to call "hdp-select set all <current_version>" on all affected hosts.
+    """
+    attempted_rolling_upgrade = False
+
+    query1 = "SELECT COUNT(*) FROM upgrade WHERE to_version = '{0}';".format(version)
+    self.cursor.execute(query1)
+    row = self.cursor.fetchone()
+    if row and len(row) == 1:
+      attempted_rolling_upgrade = True if int(row[0]) > 0 else False
+    else:
+      Logger.error("Unable to run query: {0}".format(query1))
+      return
+
+    if not attempted_rolling_upgrade:
+      host_names_already_upgrading = set()
+
+      if self.compare_versions(self.ambari_version, "2.1.0") < 0:
+        query2 = "SELECT hv.host_name, hv.state FROM host_version hv WHERE hv.repo_version_id = {0} AND hv.state IN ('UPGRADING', 'UPGRADED');".format(repo_version_id)
+      else:
+        query2 = "SELECT h.host_name, hv.state FROM hosts h JOIN host_version hv ON h.host_id = hv.host_id WHERE hv.repo_version_id = {0} AND hv.state IN ('UPGRADING', 'UPGRADED');".format(repo_version_id)
+
+      self.cursor.execute(query2)
+      rows = self.cursor.fetchall()
+      if rows and len(rows) > 0:
+        host_names_already_upgrading = set([row[0] for row in rows if len(row) == 2])
+
+      question = None
+      answer = None
+      if len(host_names_already_upgrading) > 0:
+        question = "We noticed that you did not launch an Automated Rolling Upgrade to version {0},\n" \
+                   "but its state is {1} and some host(s) are already upgrading to it.\n" \
+                   "Did you intend to perform a manual upgrade already?\n" \
+                   "1) Yes, I already started changing versions manually. Leave it as is.\n" \
+                   "2) No, should not have started yet. Please revert it back to INSTALLED.\n".format(version, state)
+      else:
+        question = "We noticed that you did not launch an Automated Rolling Upgrade to version {0},\n" \
+                   "but its state is {1}.\n" \
+                   "Did you intend to perform a manual upgrade?\n" \
+                   "1) Yes, I already started changing versions manually. Leave it as is.\n" \
+                   "2) No, should not have started yet. Please revert it back to INSTALLED.\n".format(version, state)
+
+      while answer is None:
+        answer = self.ask_question(question, numeric=True)
+        if answer == 1:
+          Logger.info("Ok\n")
+        elif answer == 2:
+          current_cluster_version = self.__get_current_cluster_version()
+          if current_cluster_version is not None and "version" in current_cluster_version:
+            Logger.info("IMPORTANT: We are forcing the Cluster Version and Host Versions back to INSTALLED. Before starting ambari-server,\n" \
+                        "please ensure that all hosts are indeed on the original version, by running \"hdp-select set all {0}\" on each host.\n".format(current_cluster_version["version"]))
+          else:
+            Logger.error("Unable to find a Cluster Version that is CURRENT\n.")
+            return
+
+          statement1 = "UPDATE cluster_version SET state = 'INSTALLED' WHERE repo_version_id = {0} AND state = '{1}';".format(repo_version_id, state)
+          statement2 = "UPDATE host_version SET state = 'INSTALLED' WHERE repo_version_id = {0} AND state IN ('UPGRADING', 'UPGRADED');".format(repo_version_id)
+          statement3 = "UPDATE hostcomponentstate set version = '{0}' WHERE version = '{1}';".format(current_cluster_version["version"], version)
+          statement4 = "UPDATE hostcomponentstate set upgrade_state = 'NONE';"
+
+          Logger.info("Will run update statements:\n{0}\n{1}\n{2}\n{3}\n".format(statement1, statement2, statement3, statement4))
+          self.cursor.execute(statement1)
+          self.cursor.execute(statement2)
+          self.cursor.execute(statement3)
+          self.cursor.execute(statement4)
+          self.conn.commit()
+        else:
+          Logger.error("Invalid option \"{0}\", please try again.\n".format(answer))
+          answer = None
+
+  def __get_current_cluster_version(self):
+    """
+    :return: Return a dictionary with stats from the Cluster Version whose state is CURRENT. If one doesn't exist, return None.
+    """
+    query = "SELECT rv.repo_version_id, rv.version, rv.display_name FROM cluster_version cv JOIN repo_version rv ON cv.repo_version_id = rv.repo_version_id WHERE cv.state = 'CURRENT';"
+    self.cursor.execute(query)
+    row = self.cursor.fetchone()
+    if row and len(row) == 3:
+      return {"repo_version_id": row[0],
+              "version": row[1],
+              "display_name": row[2]}
+    Logger.warning("Unable to run query: {0}\n".format(query))
+    return None
+
+  def finalize(self):
+    """
+    Check any versions in UPGRADING/UPGRADED, ask the user to pick the one they want CURRENT, and all others
+    will go to INSTALLED.
+    This will also update the hostcomponentstate with that version for all components that advertise a version.
+    Further, it will set the upgrade_state to None.
+    Next, it will update the host_version and cluster_version accordingly.
+    And pick the CURRENT cluster version, and change it to INSTALLED.
+    If this was a manual stack upgrade, will need to fix the Current and Desired Stack version.
+    If there are any pending RU requests, will mark them as ABORTED.
+    """
+    query = "SELECT rv.repo_version_id, rv.version, rv.display_name, cv.state FROM cluster_version cv JOIN repo_version rv ON cv.repo_version_id = rv.repo_version_id ORDER BY rv.repo_version_id;"
+    self.cursor.execute(query)
+    rows = self.cursor.fetchall()
+    if rows is None or len(rows) == 0:
+      Logger.warning("Did not find any Cluster Versions, nothing to do. Query: {0}".format(query))
+      return
+
+    current_cluster_version = self.__get_current_cluster_version()
+    if current_cluster_version is None or "version" not in current_cluster_version:
+      Logger.error("Unable to find a Cluster Version that is CURRENT. Please resolve this with help from support.\n")
+      return
+
+    question1 = "Which Cluster Version would you like to make CURRENT?\n"
+    for i in range(0, len(rows)):
+      version = rows[i][1]
+      state = rows[i][3]
+      question1 += "{0}) {1} in state {2}\n".format(i+1, version, state)
+
+    answer1 = None
+    while answer1 is None:
+      answer1 = self.ask_question(question1, numeric=True)
+      if answer1 >= 1 and answer1 <= len(rows):
+        desired_current_version = rows[answer1 - 1][1]
+
+        if current_cluster_version["version"] == desired_current_version:
+          Logger.info("Version {0} is already CURRENT, nothing to do.\n".format(desired_current_version))
+        else:
+          question2 = "Will make version {0} CURRENT, and change version {1} to INSTALLED. Is that your final answer?\n" \
+                      "1) Yes. I already ran or plan to run \"HDFS Finalize\".\n" \
+                      "2) No\n".format(desired_current_version, current_cluster_version["version"])
+          answer2 = None
+          while answer2 is None:
+            answer2 = self.ask_question(question2, numeric=True)
+            if answer2 == 1:
+              Logger.info("Your wish is my command!\n")
+              self.__finalize_version(current_cluster_version, desired_current_version)
+            elif answer2 == 2:
+              Logger.info("Ok, we'll leave it as is.\n")
+            else:
+              Logger.error("Invalid option \"{0}\", please try again.\n".format(answer2))
+              answer2 = None
+      else:
+        Logger.error("Invalid option \"{0}\", please try again.\n".format(answer1))
+        answer1 = None
+
+  def __finalize_version(self, current_cluster_version, desired_cluster_version):
+    """
+    Finalize a version by marking it as CURRENT
+    :param current_cluster_version: Dictionary with information about the current version.
+    :param desired_cluster_version: String of version that should be marked as CURRENT.
+    """
+    if current_cluster_version is None or "version" not in current_cluster_version:
+      Logger.error("Could not finalize to version {0} because did not receive a Cluster Version that is CURRENT.".format(desired_cluster_version))
+      return
+
+    if current_cluster_version["version"] == desired_cluster_version:
+      Logger.info("Asking to finalize to a version that is already CURRENT, nothing to do.\n")
+      return
+
+    # Get the repo_version_id for the desired object
+    repo_version = self.__get_repo_version(desired_cluster_version)
+    if repo_version is None or "repo_version_id" not in repo_version:
+      Logger.error("Unable to query Repo Version {0}.\n".format(desired_cluster_version))
+      return
+
+    statements = []
+    statements.append("UPDATE cluster_version SET state = 'CURRENT' WHERE repo_version_id = {0};".format(repo_version["repo_version_id"]))
+    statements.append("UPDATE host_version SET state = 'CURRENT' WHERE repo_version_id = {0};".format(repo_version["repo_version_id"]))
+    statements.append("UPDATE hostcomponentstate set version = '{0}' WHERE version NOT IN ('{0}', 'UNKNOWN');".format(desired_cluster_version))
+    statements.append("UPDATE hostcomponentstate set upgrade_state = 'NONE';")
+    statements.append("UPDATE cluster_version SET state = 'INSTALLED' WHERE repo_version_id = {0} AND state = 'CURRENT';".format(current_cluster_version["repo_version_id"]))
+    statements.append("UPDATE host_version SET state = 'INSTALLED' WHERE repo_version_id = {0} AND state = 'CURRENT';".format(current_cluster_version["repo_version_id"]))
+
+    upgrade_in_progress = self.__get_last_upgrade_id_for_repo_version(desired_cluster_version)
+    additional_statements = self.__get_update_statements_for_tasks_in_progress(upgrade_in_progress)
+    statements += additional_statements
+
+    # Upgrade the stack
+    if self.compare_versions(self.ambari_version, "2.1.0") < 0:
+      '''
+      # In Ambari 2.0, it was not possible to do major stack upgrades, so several columns will retain HDP 2.2 as the stackVersion.
+      E.g.,
+      _Table____________________|_Column________________|_Value____________________________________
+      clusters                  | desired_stack_version | {"stackName":"HDP","stackVersion":"2.2"}
+      clusterstate              | current_stack_version | {"stackName":"HDP","stackVersion":"2.2"}
+      hostcomponentstate        | current_stack_version | {"stackName":"HDP","stackVersion":"2.2"}
+      hostcomponentdesiredstate | desired_stack_version | {"stackName":"HDP","stackVersion":"2.2"}
+      '''
+      pass
+    else:
+      statements.append("UPDATE clusters SET desired_stack_id = {0} WHERE cluster_id = {1};".format(repo_version["stack_id"], self.cluster_id))
+      statements.append("UPDATE clusterstate SET current_stack_id = {0} WHERE cluster_id = {1};".format(repo_version["stack_id"], self.cluster_id))
+
+    Logger.info("Will run update statements:\n" + "\n".join(statements) + "\n")
+
+    try:
+      for statement in statements:
+        self.cursor.execute(statement)
+      self.conn.commit()
+      Logger.info("Success, enjoy the rest of your day!\n")
+    except Exception, e:
+      Logger.error("Unable to run update statements to finalize version {0}. Error: {1}\n".format(desired_cluster_version, e.message))
+
+  def __get_repo_version(self, version):
+    """
+    Get the Repo Version object whose version matches.
+    :param version: Version string, e.g., 2.3.0.0-1234
+    :return: Return the a dictionary with the parameters of the object.
+    """
+    if self.compare_versions(self.ambari_version, "2.1.0") < 0:
+      query = "SELECT repo_version_id, version, display_name FROM repo_version WHERE version = '{0}';".format(version)
+    else:
+      query = "SELECT repo_version_id, version, display_name, stack_id FROM repo_version WHERE version = '{0}';".format(version)
+
+    self.cursor.execute(query)
+    rows = self.cursor.fetchall()
+    if rows is not None or len(rows) == 1:
+      obj = {}
+      if len(rows[0]) >= 3:
+        obj = {"repo_version_id": rows[0][0],
+               "version": rows[0][1],
+               "display_name": rows[0][2]}
+      if len(rows[0]) >= 4:
+        obj["stack_id"] = rows[0][3]
+      return obj
+    Logger.warning("Unable to run query, or generated multiple rows: {0}".format(query))
+    return None
+
+  def __get_last_upgrade_id_for_repo_version(self, version):
+    """
+    Get the most recent upgrade_id for the Upgrade record whose to_version matches the version.
+    :param version: Version string, e.g., 2.3.0.0-1234
+    :return: Return the a dictionary with the parameters of the object.
+    """
+    query = "SELECT u.upgrade_id, u.direction, u.from_version, u.to_version FROM upgrade u JOIN repo_version rv " \
+            "ON u.to_version = rv.version JOIN cluster_version cv ON rv.repo_version_id = cv.repo_version_id AND " \
+            "cv.cluster_id = {0} AND u.to_version = '{1}' AND cv.state IN ('UPGRADING', 'UPGRADED') " \
+            "ORDER BY u.upgrade_id DESC;".format(self.cluster_id, version)
+
+    self.cursor.execute(query)
+    rows = self.cursor.fetchall()
+    if rows is not None and len(rows) == 1:
+      obj = {}
+      if len(rows[0]) == 4:
+        obj = {"upgrade_id": rows[0][0],
+               "direction": rows[0][1],
+               "from_version": rows[0][2],
+               "from_version": rows[0][3]}
+        return obj
+    return None
+
+  def __get_update_statements_for_tasks_in_progress(self, upgrade_in_progress):
+    """
+    When an RU is in progress, need to abort any host_role_command records for it that are still PENDING, QUEUED, IN_PROGRESS, HOLDING.
+    :param upgrade_in_progress: Dictionary of the upgrade in progress, which contains an "upgrade_id"
+    :return: List of update statements to run, or empty list if no statements could be generated.
+    """
+    statements = []
+    if upgrade_in_progress and "upgrade_id" in upgrade_in_progress:
+      # There should only be one request, which is used for all of the host_role_command records as part of either the
+      # upgrade or downgrade.
+      query = "SELECT DISTINCT(hrc.request_id) FROM upgrade u " \
+              "JOIN upgrade_group g ON u.upgrade_id = g.upgrade_id " \
+              "JOIN upgrade_item i ON g.upgrade_group_id = i.upgrade_group_id " \
+              "JOIN host_role_command hrc ON i.stage_id = hrc.stage_id AND u.request_id = hrc.request_id " \
+              "WHERE u.upgrade_id = {0} LIMIT 1;".format(upgrade_in_progress["upgrade_id"])
+
+      self.cursor.execute(query)
+      row = self.cursor.fetchone()
+      if row and len(row) == 1:
+        request_id = row[0]
+        statements.append("UPDATE host_role_command SET status = 'ABORTED' WHERE request_id = {0} AND status IN ('PENDING', 'QUEUED', 'IN_PROGRESS', 'HOLDING');".format(request_id))
+    return statements
+
+
+if __name__ == '__main__':
+  magician = RUMagician(sys.argv)

http://git-wip-us.apache.org/repos/asf/ambari/blob/42a44e4a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 43db93a..d14d98c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -301,6 +301,8 @@
             <exclude>contrib/ambari-scom/ambari-scom-server/pass.txt</exclude>
             <exclude>contrib/ambari-scom/*/rat.txt</exclude>
             <exclude>contrib/ambari-scom/metrics-sink/target/**</exclude>
+            <exclude>contrib/ru_magician/lgpl_license.txt</exclude>
+            <exclude>contrib/ru_magician/README.md</exclude>
             <exclude>contrib/views/*/target/**</exclude>
             <exclude>contrib/views/hive/src/main/resources/ui/hive-web/vendor/codemirror/**</exclude>
             <exclude>contrib/views/hive/src/main/resources/ui/hive-web/.bowerrc</exclude>


Mime
View raw message