ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alejan...@apache.org
Subject ambari git commit: AMBARI-12252. Prevent datanode from creating an HDFS datadir when drive becomes unmounted (alejandro)
Date Thu, 02 Jul 2015 20:33:28 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 c245df22d -> b4b139b3f


AMBARI-12252. Prevent datanode from creating an HDFS datadir when drive becomes unmounted
(alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b4b139b3
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b4b139b3
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b4b139b3

Branch: refs/heads/branch-2.1
Commit: b4b139b3f313dadc4f182e138abba8eb13799d56
Parents: c245df2
Author: Alejandro Fernandez <afernandez@hortonworks.com>
Authored: Thu Jul 2 13:33:03 2015 -0700
Committer: Alejandro Fernandez <afernandez@hortonworks.com>
Committed: Thu Jul 2 13:33:21 2015 -0700

----------------------------------------------------------------------
 .../resource_management/TestDatanodeHelper.py   | 191 +++++++++++++++++++
 .../resource_management/TestFileSystem.py       |  15 +-
 ambari-agent/src/test/python/unitTests.py       |  62 +++++-
 .../libraries/functions/dfs_datanode_helper.py  |  81 +++++---
 4 files changed, 312 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b4b139b3/ambari-agent/src/test/python/resource_management/TestDatanodeHelper.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestDatanodeHelper.py b/ambari-agent/src/test/python/resource_management/TestDatanodeHelper.py
new file mode 100644
index 0000000..e348cc4
--- /dev/null
+++ b/ambari-agent/src/test/python/resource_management/TestDatanodeHelper.py
@@ -0,0 +1,191 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import os
+import logging
+from unittest import TestCase
+from mock.mock import Mock, MagicMock, patch
+
+from resource_management.libraries.functions import dfs_datanode_helper
+from resource_management.core.logger import Logger
+
+
+class StubParams(object):
+  """
+  Dummy class to fake params where params.x performs a get on params.dict["x"]
+  """
+  def __init__(self):
+    self.dict = {}
+
+  def __getattr__(self, name):
+    return self.dict[name]
+
+  def __repr__(self):
+    name = self.__class__.__name__
+    mocks = set(dir(self))
+    mocks = [x for x in mocks if not str(x).startswith("__")]   # Exclude private methods
+    return "<StubParams: {0}; mocks: {1}>".format(name, str(mocks))
+
+
+def fake_create_dir(directory, other):
+  """
+  Fake function used as function pointer.
+  """
+  print "Fake function to create directory {0}".format(directory)
+
+
+class TestDatanodeHelper(TestCase):
+  """
+  Test the functionality of the dfs_datanode_helper.py
+  """
+  logger = logging.getLogger('TestDatanodeHelper')
+
+  grid0 = "/grid/0/data"
+  grid1 = "/grid/1/data"
+  grid2 = "/grid/2/data"
+
+  params = StubParams()
+  params.data_dir_mount_file = "/etc/hadoop/conf/dfs_data_dir_mount.hist"
+  params.dfs_data_dir = "{0},{1},{2}".format(grid0, grid1, grid2)
+
+
+  @patch.object(Logger, "info")
+  @patch.object(Logger, "error")
+  @patch.object(dfs_datanode_helper, "_write_data_dir_to_mount_in_file")
+  def test_normalized(self, mock_write_data_dir_to_file, log_error, log_info):
+    """
+    Test that the data dirs are normalized by removing leading and trailing whitespace, and
case sensitive.
+    """
+    params = StubParams()
+    params.data_dir_mount_file = "/etc/hadoop/conf/dfs_data_dir_mount.hist"
+    params.dfs_data_dir = "/grid/0/data  ,  /grid/1/data  ,/GRID/2/Data/"
+
+    # Function under test
+    dfs_datanode_helper.handle_dfs_data_dir(fake_create_dir, params, update_cache=False)
+
+    for (name, args, kwargs) in log_info.mock_calls:
+      print args[0]
+    for (name, args, kwargs) in log_error.mock_calls:
+      print args[0]
+
+    log_info.assert_any_call("Forcefully creating directory: /grid/0/data")
+    log_info.assert_any_call("Forcefully creating directory: /grid/1/data")
+    log_info.assert_any_call("Forcefully creating directory: /GRID/2/Data/")
+
+    self.assertEquals(0, log_error.call_count)
+
+
+  @patch.object(Logger, "info")
+  @patch.object(Logger, "error")
+  @patch.object(dfs_datanode_helper, "_write_data_dir_to_mount_in_file")
+  @patch.object(dfs_datanode_helper, "get_mount_point_for_dir")
+  @patch.object(os.path, "isdir")
+  def test_save_mount_points(self, mock_os_isdir, mock_get_mount_point, mock_write_data_dir_to_mount_in_file,
log_error, log_info):
+    """
+    Test when all mounts are on root.
+    """
+    mock_get_mount_point.side_effect = ["/", "/", "/"] * 2
+    mock_os_isdir.side_effect = [False, False, False] + [True, True, True]
+    mock_write_data_dir_to_mount_in_file.return_value = True
+
+    # Function under test
+    dfs_datanode_helper.handle_dfs_data_dir(fake_create_dir, self.params, update_cache=False)
+
+    for (name, args, kwargs) in log_info.mock_calls:
+      print args[0]
+
+    for (name, args, kwargs) in log_error.mock_calls:
+      print args[0]
+
+    self.assertEquals(0, log_error.call_count)
+    mock_write_data_dir_to_mount_in_file.assert_called_once_with(self.params, {self.grid0:
"/", self.grid1: "/", self.grid2: "/"})
+
+
+  @patch.object(Logger, "info")
+  @patch.object(Logger, "error")
+  @patch.object(dfs_datanode_helper, "_get_data_dir_to_mount_from_file")
+  @patch.object(dfs_datanode_helper, "_write_data_dir_to_mount_in_file")
+  @patch.object(dfs_datanode_helper, "get_mount_point_for_dir")
+  @patch.object(os.path, "isdir")
+  @patch.object(os.path, "exists")
+  def test_grid_becomes_unmounted(self, mock_os_exists, mock_os_isdir, mock_get_mount_point,
mock_write_data_dir_to_mount_in_file, mock_get_data_dir_to_mount_from_file, log_error, log_info):
+    """
+    Test when grid2 becomes unmounted
+    """
+    mock_os_exists.return_value = True    # Indicate that history file exists
+
+    # Initially, all grids were mounted
+    mock_get_data_dir_to_mount_from_file.return_value = {self.grid0: "/dev0", self.grid1:
"/dev1", self.grid2: "/dev2"}
+
+    # Grid2 then becomes unmounted
+    mock_get_mount_point.side_effect = ["/dev0", "/dev1", "/"] * 2
+    mock_os_isdir.side_effect = [False, False, False] + [True, True, True]
+    mock_write_data_dir_to_mount_in_file.return_value = True
+
+    # Function under test
+    dfs_datanode_helper.handle_dfs_data_dir(fake_create_dir, self.params, update_cache=False)
+
+    for (name, args, kwargs) in log_info.mock_calls:
+      print args[0]
+
+    error_logs = []
+    for (name, args, kwargs) in log_error.mock_calls:
+      error_logs.append(args[0])    # this is a one-tuple
+      #print args[0]
+    error_msg = "".join(error_logs)
+
+    self.assertEquals(1, log_error.call_count)
+    self.assertTrue("Directory /grid/2/data does not exist and became unmounted from /dev2"
in error_msg)
+
+    # Notice that grid2 is still written with its original mount point because an error occurred
on it
+    mock_write_data_dir_to_mount_in_file.assert_called_once_with(self.params, {self.grid0:
"/dev0", self.grid1: "/dev1", self.grid2: "/dev2"})
+
+
+  @patch.object(Logger, "info")
+  @patch.object(Logger, "error")
+  @patch.object(dfs_datanode_helper, "_get_data_dir_to_mount_from_file")
+  @patch.object(dfs_datanode_helper, "_write_data_dir_to_mount_in_file")
+  @patch.object(dfs_datanode_helper, "get_mount_point_for_dir")
+  @patch.object(os.path, "isdir")
+  @patch.object(os.path, "exists")
+  def test_grid_becomes_remounted(self, mock_os_exists, mock_os_isdir, mock_get_mount_point,
mock_write_data_dir_to_mount_in_file, mock_get_data_dir_to_mount_from_file, log_error, log_info):
+    """
+    Test when grid2 becomes remounted
+    """
+    mock_os_exists.return_value = True    # Indicate that history file exists
+
+    # Initially, all grids were mounted
+    mock_get_data_dir_to_mount_from_file.return_value = {self.grid0: "/dev0", self.grid1:
"/dev1", self.grid2: "/"}
+
+    # Grid2 then becomes remounted
+    mock_get_mount_point.side_effect = ["/dev0", "/dev1", "/dev2"] * 2
+    mock_os_isdir.side_effect = [False, False, False] + [True, True, True]
+    mock_write_data_dir_to_mount_in_file.return_value = True
+
+    # Function under test
+    dfs_datanode_helper.handle_dfs_data_dir(fake_create_dir, self.params, update_cache=False)
+
+    for (name, args, kwargs) in log_info.mock_calls:
+      print args[0]
+
+    for (name, args, kwargs) in log_error.mock_calls:
+      print args[0]
+
+    self.assertEquals(0, log_error.call_count)
+
+    # Notice that grid2 is now written with its new mount point to prevent a regression
+    mock_write_data_dir_to_mount_in_file.assert_called_once_with(self.params, {self.grid0:
"/dev0", self.grid1: "/dev1", self.grid2: "/dev2"})
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4b139b3/ambari-agent/src/test/python/resource_management/TestFileSystem.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestFileSystem.py b/ambari-agent/src/test/python/resource_management/TestFileSystem.py
index 91fd71d..4e0eb63 100644
--- a/ambari-agent/src/test/python/resource_management/TestFileSystem.py
+++ b/ambari-agent/src/test/python/resource_management/TestFileSystem.py
@@ -22,6 +22,7 @@ from mock.mock import patch
 
 from resource_management.libraries.functions import file_system
 import resource_management.core.providers.mount
+from resource_management.core.logger import Logger
 
 
 class TestFileSystem(TestCase):
@@ -78,7 +79,9 @@ class TestFileSystem(TestCase):
 
     return mount_val
 
-  def test_invalid(self):
+  @patch.object(Logger, "info")
+  @patch.object(Logger, "error")
+  def test_invalid(self, log_error, log_info):
     """
     Testing when parameters are invalid or missing.
     """
@@ -91,9 +94,10 @@ class TestFileSystem(TestCase):
     mount_point = file_system.get_mount_point_for_dir("  ")
     self.assertEqual(mount_point, None)
 
-
+  @patch.object(Logger, "info")
+  @patch.object(Logger, "error")
   @patch('resource_management.core.providers.mount.get_mounted')
-  def test_at_root(self, mounted_mock):
+  def test_at_root(self, mounted_mock, log_error, log_info):
     """
     Testing when the path is mounted on the root.
     """
@@ -102,9 +106,10 @@ class TestFileSystem(TestCase):
     mount_point = file_system.get_mount_point_for_dir("/hadoop/hdfs/data")
     self.assertEqual(mount_point, "/")
 
-
+  @patch.object(Logger, "info")
+  @patch.object(Logger, "error")
   @patch('resource_management.core.providers.mount.get_mounted')
-  def test_at_drive(self, mounted_mock):
+  def test_at_drive(self, mounted_mock, log_error, log_info):
     """
     Testing when the path is mounted on a virtual file system not at the root.
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4b139b3/ambari-agent/src/test/python/unitTests.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/unitTests.py b/ambari-agent/src/test/python/unitTests.py
index b6f8411..83b5a0f 100644
--- a/ambari-agent/src/test/python/unitTests.py
+++ b/ambari-agent/src/test/python/unitTests.py
@@ -18,7 +18,28 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
+"""
+SAMPLE USAGE:
+
+python unitTests.py
+python unitTests.py NameOfFile.py
+python unitTests.py NameOfFileWithoutExtension  (this will append .* to the end, so it can
match other file names too)
+
+SETUP:
+To run in Linux from command line,
+cd to this same directory. Then make sure PYTHONPATH is correct.
+
+export PYTHONPATH=$PYTHONPATH:$(pwd)/ambari-agent/src/test/python:
+$(pwd)/ambari-common/src/test/python:
+$(pwd)/ambari-agent/src/test/python/ambari_agent:
+$(pwd)/ambari-common/src/main/python:
+$(pwd)/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/files:
+$(pwd)/ambari-agent/src/test/python/resource_management:
+$(pwd)/ambari-common/src/main/python/ambari_jinja2
+"""
+
 import unittest
+import fnmatch
 from os.path import isdir
 import logging
 from only_for_platform import get_platform, PLATFORM_WINDOWS
@@ -34,6 +55,8 @@ if get_platform() == PLATFORM_WINDOWS:
 else:
   IGNORE_FOLDERS = ["resource_management_windows"]
 
+TEST_MASK = '[Tt]est*.py'
+
 class TestAgent(unittest.TestSuite):
   def run(self, result):
     run = unittest.TestSuite.run
@@ -51,15 +74,39 @@ def parent_dir(path):
 
   return parent_dir
 
+def get_test_files(path, mask=None, recursive=True):
+  """
+  Returns test files for path recursively
+  """
+  # Must convert mask so it can match a file
+  if mask and mask != "" and not mask.endswith("*"):
+    mask = mask + "*"
+
+  file_list = []
+  directory_items = os.listdir(path)
 
-def all_tests_suite():
+  for item in directory_items:
+    add_to_pythonpath = False
+    p = os.path.join(path, item)
+    if os.path.isfile(p):
+      if fnmatch.fnmatch(item, mask):
+        add_to_pythonpath = True
+        file_list.append(item)
+    elif os.path.isdir(p)and p not in IGNORE_FOLDERS:
+      if recursive:
+        file_list.extend(get_test_files(p, mask=mask))
+    if add_to_pythonpath:
+      sys.path.append(path)
 
+  return file_list
+
+
+def all_tests_suite(custom_test_mask):
+  test_mask = custom_test_mask if custom_test_mask else TEST_MASK
 
   src_dir = os.getcwd()
-  files_list = []
-  for directory in os.listdir(src_dir):
-    if os.path.isdir(directory) and not (directory in IGNORE_FOLDERS):
-      files_list += os.listdir(src_dir + os.sep + directory)
+  files_list = get_test_files(src_dir, mask=test_mask)
+
   #TODO Add an option to randomize the tests' execution
   #shuffle(files_list)
   tests_list = []
@@ -84,12 +131,15 @@ def all_tests_suite():
   return unittest.TestSuite([suite])
 
 def main():
+  test_mask = None
+  if len(sys.argv) >= 2:
+    test_mask = sys.argv[1]
 
   logger.info('------------------------------------------------------------------------')
   logger.info('PYTHON AGENT TESTS')
   logger.info('------------------------------------------------------------------------')
   runner = unittest.TextTestRunner(verbosity=2, stream=sys.stdout)
-  suite = all_tests_suite()
+  suite = all_tests_suite(test_mask)
   status = runner.run(suite).wasSuccessful()
 
   if not status:

http://git-wip-us.apache.org/repos/asf/ambari/blob/b4b139b3/ambari-common/src/main/python/resource_management/libraries/functions/dfs_datanode_helper.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/dfs_datanode_helper.py
b/ambari-common/src/main/python/resource_management/libraries/functions/dfs_datanode_helper.py
index ad5a984..a05e162 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/dfs_datanode_helper.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/dfs_datanode_helper.py
@@ -26,14 +26,12 @@ from resource_management.libraries.functions.file_system import get_mount_point_
 from resource_management.core.logger import Logger
 
 
-def _write_data_dir_to_mount_in_file(new_data_dir_to_mount_point):
+def _write_data_dir_to_mount_in_file(params, new_data_dir_to_mount_point):
   """
   :param new_data_dir_to_mount_point: Dictionary to write to the data_dir_mount_file file,
where
   the key is each DFS data dir, and the value is its current mount point.
   :return: Returns True on success, False otherwise.
   """
-  import params
-
   # Overwrite the existing file, or create it if doesn't exist
   if params.data_dir_mount_file:
     try:
@@ -53,12 +51,11 @@ def _write_data_dir_to_mount_in_file(new_data_dir_to_mount_point):
   return True
 
 
-def _get_data_dir_to_mount_from_file():
+def _get_data_dir_to_mount_from_file(params):
   """
   :return: Returns a dictionary by parsing the data_dir_mount_file file,
   where the key is each DFS data dir, and the value is its last known mount point.
   """
-  import params
   data_dir_to_mount = {}
 
   if params.data_dir_mount_file is not None and os.path.exists(str(params.data_dir_mount_file)):
@@ -78,7 +75,7 @@ def _get_data_dir_to_mount_from_file():
   return data_dir_to_mount
 
 
-def handle_dfs_data_dir(func, params):
+def handle_dfs_data_dir(func, params, update_cache=True):
   """
   This function determine which DFS data dir paths can be created.
   There are 2 uses cases:
@@ -98,12 +95,24 @@ def handle_dfs_data_dir(func, params):
   :param func: Function that will be called if a directory will be created. This function
                will be called as func(data_dir, params)
   :param params: parameters to pass to function pointer
+  :param update_cache: Bool indicating whether to update the global cache of mount points
   """
-  prev_data_dir_to_mount_point = _get_data_dir_to_mount_from_file()
 
+  # Get the data dirs that Ambari knows about and their last known mount point
+  prev_data_dir_to_mount_point = _get_data_dir_to_mount_from_file(params)
+
+  # Dictionary from data dir to the mount point that will be written to the history file.
+  # If a data dir becomes unmounted, we should still keep its original value.
+  # If a data dir was previously on / and is now mounted on a drive, we should store that
too.
+  data_dir_to_mount_point = prev_data_dir_to_mount_point.copy()
+
+  # This should typically be False for customers, but True the first time.
   allowed_to_create_any_dir = params.data_dir_mount_file is None or not os.path.exists(params.data_dir_mount_file)
 
-  valid_data_dirs = []
+  valid_data_dirs = []                # data dirs that have been normalized
+  error_messages = []                 # list of error messages to report at the end
+  data_dirs_unmounted = set()         # set of data dirs that have become unmounted
+
   for data_dir in params.dfs_data_dir.split(","):
     if data_dir is None or data_dir.strip() == "":
       continue
@@ -112,38 +121,58 @@ def handle_dfs_data_dir(func, params):
     valid_data_dirs.append(data_dir)
 
     if not os.path.isdir(data_dir):
-      create_this_dir = allowed_to_create_any_dir
-      # Determine if should be allowed to create the data_dir directory
-      if not create_this_dir:
+      may_create_this_dir = allowed_to_create_any_dir
+      last_mount_point_for_dir = None
+
+      # Determine if should be allowed to create the data_dir directory.
+      # Either first time, became unmounted, or was just mounted on a drive
+      if not may_create_this_dir:
         last_mount_point_for_dir = prev_data_dir_to_mount_point[data_dir] if data_dir in
prev_data_dir_to_mount_point else None
+
         if last_mount_point_for_dir is None:
-          # Couldn't retrieve any information about where this dir used to be mounted, so
allow creating the directory
-          # to be safe.
-          create_this_dir = True
+          # Couldn't retrieve any information about where this dir used to be mounted, so
allow creating the directory to be safe.
+          may_create_this_dir = True
         else:
           curr_mount_point = get_mount_point_for_dir(data_dir)
 
           # This means that create_this_dir will stay false if the directory became unmounted.
+          # In other words, allow creating if it was already on /, or it's currently not
on /
           if last_mount_point_for_dir == "/" or (curr_mount_point is not None and curr_mount_point
!= "/"):
-            create_this_dir = True
+            may_create_this_dir = True
 
-      if create_this_dir:
-        Logger.info("Forcefully creating directory: %s" % str(data_dir))
+      if may_create_this_dir:
+        Logger.info("Forcefully creating directory: {0}".format(data_dir))
 
         # Call the function
         func(data_dir, params)
       else:
-        Logger.warning("Directory %s does not exist and became unmounted." % str(data_dir))
-
-  # Refresh the known mount points
-  get_and_cache_mount_points(refresh=True)
-
-  new_data_dir_to_mount_point = {}
+        # Additional check that wasn't allowed to create this dir and became unmounted.
+        if last_mount_point_for_dir is not None:
+          data_dirs_unmounted.add(data_dir)
+          msg = "Directory {0} does not exist and became unmounted from {1} .".format(data_dir,
last_mount_point_for_dir)
+          error_messages.append(msg)
+  pass
+
+  # This is set to false during unit tests.
+  if update_cache:
+    get_and_cache_mount_points(refresh=True)
+
+  # Update all data dirs (except the unmounted ones) with their current mount points.
   for data_dir in valid_data_dirs:
     # At this point, the directory may or may not exist
-    if os.path.isdir(data_dir):
+    if os.path.isdir(data_dir) and data_dir not in data_dirs_unmounted:
       curr_mount_point = get_mount_point_for_dir(data_dir)
-      new_data_dir_to_mount_point[data_dir] = curr_mount_point
+      data_dir_to_mount_point[data_dir] = curr_mount_point
 
   # Save back to the file
-  _write_data_dir_to_mount_in_file(new_data_dir_to_mount_point)
\ No newline at end of file
+  _write_data_dir_to_mount_in_file(params, data_dir_to_mount_point)
+
+  if error_messages and len(error_messages) > 0:
+    header = " ERROR ".join(["*****"] * 6)
+    header = "\n" + "\n".join([header, ] * 3) + "\n"
+    msg = " ".join(error_messages) + \
+          " Please remount the data dir(s) and run this command again. To ignore this failure
and allow writing to the " \
+          "root partition, either update the contents of {0}, or delete that file.".format(params.data_dir_mount_file)
+    Logger.error(header + msg + header)
+
+


Mime
View raw message