ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aonis...@apache.org
Subject ambari git commit: AMBARI-15762. Component install post processing can not be run in parallel (aonishuk)
Date Thu, 07 Apr 2016 17:30:35 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 016a303bd -> 5f0a09690


AMBARI-15762. Component install post processing can not be run in parallel (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5f0a0969
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5f0a0969
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5f0a0969

Branch: refs/heads/trunk
Commit: 5f0a09690310fee76c4cab3639c1c4ec08ecca8a
Parents: 016a303
Author: Andrew Onishuk <aonishuk@hortonworks.com>
Authored: Thu Apr 7 20:29:50 2016 +0300
Committer: Andrew Onishuk <aonishuk@hortonworks.com>
Committed: Thu Apr 7 20:30:19 2016 +0300

----------------------------------------------------------------------
 .../TestFileBasedProcessLock.py                 | 61 ++++++++++++++++
 .../functions/file_based_process_lock.py        | 73 ++++++++++++++++++++
 .../2.0.6/hooks/after-INSTALL/scripts/params.py |  5 ++
 .../scripts/shared_initialization.py            |  8 ++-
 4 files changed, 145 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5f0a0969/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py
b/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py
new file mode 100644
index 0000000..e4606cc
--- /dev/null
+++ b/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py
@@ -0,0 +1,61 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import tempfile
+import time
+import shutil
+from unittest import TestCase
+from multiprocessing import Process
+from only_for_platform import  not_for_platform, PLATFORM_WINDOWS
+from resource_management.libraries.functions.file_based_process_lock import FileBasedProcessLock
+
+class TestFileBasedProcessLock(TestCase):
+
+
+  @not_for_platform(PLATFORM_WINDOWS)
+  def test_file_based_lock(self):
+    """
+    Test BlockingLock using mkdir atomicity.
+    """
+    test_temp_dir = tempfile.mkdtemp(prefix="test_file_based_lock")
+    try:
+      indicator_dir = os.path.join(test_temp_dir, "indicator")
+      lock_file = os.path.join(test_temp_dir, "lock")
+
+      # Raises an exception if mkdir operation fails.
+      # It indicates that more than one process acquired the lock.
+      def dummy_task(index):
+        with FileBasedProcessLock(lock_file):
+          os.mkdir(indicator_dir)
+          time.sleep(0.1)
+          os.rmdir(indicator_dir)
+
+      process_list = []
+      for i in range(0, 3):
+        p = Process(target=dummy_task, args=(i,))
+        p.start()
+        process_list.append(p)
+
+      for p in process_list:
+        p.join(2)
+        self.assertEquals(p.exitcode, 0)
+
+    finally:
+      shutil.rmtree(test_temp_dir)
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/5f0a0969/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py
b/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py
new file mode 100644
index 0000000..f9c981d
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+import fcntl
+
+from resource_management.core.logger import Logger
+
+class FileBasedProcessLock(object):
+  """A file descriptor based lock for interprocess locking.
+  The lock is automatically released when process dies.
+
+  WARNING: Do not use this lock for synchronization between threads.
+  Multiple threads in a same process can simultaneously acquire this lock.
+  It should be used only for locking between processes.
+  """
+
+  def __init__(self, lock_file_path):
+    """
+    :param lock_file_path: The path to the file used for locking
+    """
+    self.lock_file_name = lock_file_path
+    self.lock_file = None
+
+  def blocking_lock(self):
+    """
+    Creates the lock file if it doesn't exist.
+    Waits to acquire an exclusive lock on the lock file descriptor.
+    """
+    Logger.info("Trying to acquire a lock on {0}".format(self.lock_file_name))
+    if self.lock_file is None or self.lock_file.closed:
+      self.lock_file = open(self.lock_file_name, 'a')
+    fcntl.lockf(self.lock_file, fcntl.LOCK_EX)
+    Logger.info("Acquired the lock on {0}".format(self.lock_file_name))
+
+  def unlock(self):
+    """
+    Unlocks the lock file descriptor.
+    """
+    Logger.info("Releasing the lock on {0}".format(self.lock_file_name))
+    fcntl.lockf(self.lock_file, fcntl.LOCK_UN)
+    try:
+      if self.lock_file is not None:
+        self.lock_file.close()
+        self.lock_file = None
+    except IOError:
+      pass
+
+  def __enter__(self):
+    self.blocking_lock()
+    return None
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self.unlock()
+    return False
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/5f0a0969/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
index 9f4971d..9cef622 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py
@@ -17,6 +17,8 @@ limitations under the License.
 
 """
 
+import os
+
 from ambari_commons.constants import AMBARI_SUDO_BINARY
 from resource_management.libraries.script import Script
 from resource_management.libraries.functions import default
@@ -26,6 +28,7 @@ from resource_management.libraries.functions import format_jvm_option
 from resource_management.libraries.functions.version import format_stack_version
 
 config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
 
 dfs_type = default("/commandParams/dfs_type", "")
 
@@ -89,3 +92,5 @@ has_namenode = not len(namenode_host) == 0
 
 if has_namenode or dfs_type == 'HCFS':
   hadoop_conf_dir = conf_select.get_hadoop_conf_dir(force_latest_on_upgrade=True)
+
+link_configs_lock_file = os.path.join(tmp_dir, "link_configs_lock_file")

http://git-wip-us.apache.org/repos/asf/ambari/blob/5f0a0969/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
index c772ddb..b83b115 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py
@@ -24,6 +24,7 @@ from resource_management.libraries.functions import conf_select
 from resource_management.libraries.functions import stack_select
 from resource_management.libraries.functions.format import format
 from resource_management.libraries.functions.version import compare_versions
+from resource_management.libraries.functions.file_based_process_lock import FileBasedProcessLock
 from resource_management.libraries.resources.xml_config import XmlConfig
 from resource_management.libraries.script import Script
 
@@ -86,6 +87,7 @@ def link_configs(struct_out_file):
   """
   Links configs, only on a fresh install of HDP-2.3 and higher
   """
+  import params
 
   if not Script.is_stack_greater_or_equal("2.3"):
     Logger.info("Can only link configs for HDP-2.3 and higher.")
@@ -97,5 +99,7 @@ def link_configs(struct_out_file):
     Logger.info("Could not load 'version' from {0}".format(struct_out_file))
     return
 
-  for k, v in conf_select.get_package_dirs().iteritems():
-    conf_select.convert_conf_directories_to_symlinks(k, json_version, v)
+  # On parallel command execution this should be executed by a single process at a time.
+  with FileBasedProcessLock(params.link_configs_lock_file):
+    for k, v in conf_select.get_package_dirs().iteritems():
+      conf_select.convert_conf_directories_to_symlinks(k, json_version, v)


Mime
View raw message