From commits-return-3071-archive-asf-public=cust-asf.ponee.io@impala.apache.org Fri Feb 2 19:51:29 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id F01D118066D for ; Fri, 2 Feb 2018 19:51:28 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DFE34160C25; Fri, 2 Feb 2018 18:51:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 288C6160C5A for ; Fri, 2 Feb 2018 19:51:27 +0100 (CET) Received: (qmail 52855 invoked by uid 500); 2 Feb 2018 18:51:26 -0000 Mailing-List: contact commits-help@impala.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@impala.apache.org Delivered-To: mailing list commits@impala.apache.org Received: (qmail 52759 invoked by uid 99); 2 Feb 2018 18:51:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Feb 2018 18:51:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 94098F29D9; Fri, 2 Feb 2018 18:51:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: philz@apache.org To: commits@impala.apache.org Date: Fri, 02 Feb 2018 18:51:30 -0000 Message-Id: <7378f77c51be41c0a387e0d20b679923@git.apache.org> In-Reply-To: <4b43b408f27e4168be25c92d099add70@git.apache.org> References: <4b43b408f27e4168be25c92d099add70@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/19] impala git commit: IMPALA-6215: Removes race when using LibCache. IMPALA-6215: Removes race when using LibCache. LibCache's api to provide access to locally cached files has a race. Currently, the client of the cache accesses the locally cached path as a string, but nothing guarantees that the associated file is not removed before the client is done using it. This race is suspected as the root cause for the flakiness seen in IMPALA-6092. These tests fail once in a while with classloader errors unable to load java udf classes. In these tests, the lib cache makes no guarantee that the path to the jar will remain valid from the time the path is acquired through the time needed to fetch the jar and resolve the needed classes. LibCache offers liveness guarantees for shared objects via reference counting. The fix in this patch extends this API to also cover paths to locally cached files. Testing: - added a test to test_udfs.py that does many concurrent udf uses and removals. By increasing the concurrent operations to 100, the issue in IMPALA-6092 is locally reproducible on every run. With this fix, the problem is no longer reproducible with this test. Change-Id: I9175085201fe8b11424ab8f88d7b992cb7b0daea Reviewed-on: http://gerrit.cloudera.org:8080/9089 Reviewed-by: Tim Armstrong Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/885776ed Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/885776ed Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/885776ed Branch: refs/heads/2.x Commit: 885776ed2360c8b280b53d1806154821a4f32a0d Parents: 44ba20a Author: Vuk Ercegovac Authored: Tue Nov 21 08:41:03 2017 -0800 Committer: Impala Public Jenkins Committed: Fri Feb 2 01:10:15 2018 +0000 ---------------------------------------------------------------------- be/src/codegen/llvm-codegen.cc | 5 +- be/src/exec/external-data-source-executor.cc | 5 +- be/src/exprs/hive-udf-call.cc | 57 +++++++-------- be/src/exprs/hive-udf-call.h | 3 - be/src/runtime/lib-cache.cc | 22 +++--- be/src/runtime/lib-cache.h | 44 ++++++++++-- be/src/service/fe-support.cc | 11 +-- tests/query_test/test_udfs.py | 88 +++++++++++++++++++++-- 8 files changed, 177 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/codegen/llvm-codegen.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc index e1a606c..72293a7 100644 --- a/be/src/codegen/llvm-codegen.cc +++ b/be/src/codegen/llvm-codegen.cc @@ -327,9 +327,10 @@ Status LlvmCodeGen::LinkModuleFromLocalFs(const string& file) { Status LlvmCodeGen::LinkModuleFromHdfs(const string& hdfs_location) { if (linked_modules_.find(hdfs_location) != linked_modules_.end()) return Status::OK(); + LibCacheEntryHandle handle; string local_path; - RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(hdfs_location, LibCache::TYPE_IR, - &local_path)); + RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(hdfs_location, LibCache::TYPE_IR, + &handle, &local_path)); RETURN_IF_ERROR(LinkModuleFromLocalFs(local_path)); linked_modules_.insert(hdfs_location); return Status::OK(); http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/exec/external-data-source-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc index 7c54f39..20fe50e 100644 --- a/be/src/exec/external-data-source-executor.cc +++ b/be/src/exec/external-data-source-executor.cc @@ -136,9 +136,10 @@ ExternalDataSourceExecutor::~ExternalDataSourceExecutor() { Status ExternalDataSourceExecutor::Init(const string& jar_path, const string& class_name, const string& api_version, const string& init_string) { DCHECK(!is_initialized_); + LibCacheEntryHandle handle; string local_jar_path; - RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath( - jar_path, LibCache::TYPE_JAR, &local_jar_path)); + RETURN_IF_ERROR(LibCache::instance()->GetLocalPath( + jar_path, LibCache::TYPE_JAR, &handle, &local_jar_path)); JNIEnv* jni_env = getJNIEnv(); http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/exprs/hive-udf-call.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc index 19e2e63..e1ac676 100644 --- a/be/src/exprs/hive-udf-call.cc +++ b/be/src/exprs/hive-udf-call.cc @@ -174,10 +174,6 @@ Status HiveUdfCall::Init(const RowDescriptor& row_desc, RuntimeState* state) { // Initialize children first. RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state)); - // Copy the Hive Jar from hdfs to local file system. - RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath( - fn_.hdfs_location, LibCache::TYPE_JAR, &local_location_)); - // Initialize input_byte_offsets_ and input_buffer_size_ for (int i = 0; i < GetNumChildren(); ++i) { input_byte_offsets_.push_back(input_buffer_size_); @@ -202,30 +198,35 @@ Status HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope, JNIEnv* env = getJNIEnv(); if (env == NULL) return Status("Failed to get/create JVM"); - THiveUdfExecutorCtorParams ctor_params; - ctor_params.fn = fn_; - ctor_params.local_location = local_location_; - ctor_params.input_byte_offsets = input_byte_offsets_; - - jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_]; - jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()]; - jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()]; - - ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer; - ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer; - ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer; - ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value; - - jbyteArray ctor_params_bytes; - - // Add a scoped cleanup jni reference object. This cleans up local refs made - // below. - JniLocalFrame jni_frame; - RETURN_IF_ERROR(jni_frame.push(env)); - - RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); - // Create the java executor object - jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes); + { + LibCacheEntryHandle handle; + string local_location; + RETURN_IF_ERROR(LibCache::instance()->GetLocalPath( + fn_.hdfs_location, LibCache::TYPE_JAR, &handle, &local_location)); + THiveUdfExecutorCtorParams ctor_params; + ctor_params.fn = fn_; + ctor_params.local_location = local_location; + ctor_params.input_byte_offsets = input_byte_offsets_; + + jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_]; + jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()]; + jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()]; + + ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer; + ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer; + ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer; + ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value; + + jbyteArray ctor_params_bytes; + + // Add a scoped cleanup jni reference object. This cleans up local refs made below. + JniLocalFrame jni_frame; + RETURN_IF_ERROR(jni_frame.push(env)); + + RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); + // Create the java executor object + jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes); + } RETURN_ERROR_IF_EXC(env); RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor)); http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/exprs/hive-udf-call.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h index 7ce5eb0..8ca0372 100644 --- a/be/src/exprs/hive-udf-call.h +++ b/be/src/exprs/hive-udf-call.h @@ -116,9 +116,6 @@ class HiveUdfCall : public ScalarExpr { /// error. AnyVal* Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) const; - /// The path on the local FS to the UDF's jar - std::string local_location_; - /// input_byte_offsets_[i] is the byte offset child ith's input argument should /// be written to. std::vector input_byte_offsets_; http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/runtime/lib-cache.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc index d694c49..b4a4f59 100644 --- a/be/src/runtime/lib-cache.cc +++ b/be/src/runtime/lib-cache.cc @@ -129,6 +129,10 @@ LibCacheEntry::~LibCacheEntry() { unlink(local_path.c_str()); } +LibCacheEntryHandle::~LibCacheEntryHandle() { + if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry_); +} + Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& symbol, void** fn_ptr, LibCacheEntry** ent, bool quiet) { if (hdfs_lib_file.empty()) { @@ -173,21 +177,23 @@ void LibCache::DecrementUseCount(LibCacheEntry* entry) { if (entry == NULL) return; bool can_delete = false; { - unique_lock lock(entry->lock);; + unique_lock lock(entry->lock); --entry->use_count; can_delete = (entry->use_count == 0 && entry->should_remove); } if (can_delete) delete entry; } -Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type, - string* local_path) { +Status LibCache::GetLocalPath(const std::string& hdfs_lib_file, LibType type, + LibCacheEntryHandle* handle, string* path) { + DCHECK(handle != nullptr && handle->entry() == nullptr); + LibCacheEntry* entry = nullptr; unique_lock lock; - LibCacheEntry* entry = NULL; RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry)); - DCHECK(entry != NULL); - DCHECK_EQ(entry->type, type); - *local_path = entry->local_path; + DCHECK(entry != nullptr); + ++entry->use_count; + handle->SetEntry(entry); + *path = entry->local_path; return Status::OK(); } @@ -352,7 +358,7 @@ Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type entry_lock->swap(local_entry_lock); RETURN_IF_ERROR((*entry)->copy_file_status); - DCHECK_EQ((*entry)->type, type); + DCHECK_EQ((*entry)->type, type) << (*entry)->local_path; DCHECK(!(*entry)->local_path.empty()); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/runtime/lib-cache.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h index 4a564ee..7296a00 100644 --- a/be/src/runtime/lib-cache.h +++ b/be/src/runtime/lib-cache.h @@ -49,11 +49,16 @@ class RuntimeState; /// using the library. When the caller requests a ptr into the library, they /// are given the entry handle and must decrement the ref count when they /// are done. +/// Note: Explicitly managing this reference count at the client is error-prone. See the +/// api for accessing a path, GetLocalPath(), that uses the handle's scope to manage the +/// reference count. // /// TODO: /// - refresh libraries -/// - better cached module management. +/// - better cached module management +/// - improve the api to be less error-prone (IMPALA-6439) struct LibCacheEntry; +class LibCacheEntryHandle; class LibCache { public: @@ -71,11 +76,16 @@ class LibCache { /// Initializes the libcache. Must be called before any other APIs. static Status Init(); - /// Gets the local file system path for the library at 'hdfs_lib_file'. If - /// this file is not already on the local fs, it copies it and caches the - /// result. Returns an error if 'hdfs_lib_file' cannot be copied to the local fs. - Status GetLocalLibPath(const std::string& hdfs_lib_file, LibType type, - std::string* local_path); + /// Gets the local 'path' used to cache the file stored at the global 'hdfs_lib_file'. + /// If the referenced global file has not been copied locally, it copies it and + /// caches the result. + /// + /// 'handle' must remain in scope while 'path' is used. The reference count to the + /// underlying cache entry is decremented when 'handle' goes out-of-scope. + /// + /// Returns an error if 'hdfs_lib_file' cannot be copied to the local fs. + Status GetLocalPath(const std::string& hdfs_lib_file, LibType type, + LibCacheEntryHandle* handle, string* path); /// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok otherwise. /// If 'quiet' is true, the error status for non-Java unfound symbols will not be logged. @@ -94,6 +104,7 @@ class LibCache { /// using fn_ptr and it is no longer valid to use fn_ptr. // /// If 'quiet' is true, returned error statuses will not be logged. + /// TODO: api is error-prone. upgrade to LibCacheEntryHandle (see IMPALA-6439). Status GetSoFunctionPtr(const std::string& hdfs_lib_file, const std::string& symbol, void** fn_ptr, LibCacheEntry** entry, bool quiet = false); @@ -164,6 +175,27 @@ class LibCache { const LibMap::iterator& entry_iterator); }; +/// Handle for a LibCacheEntry that decrements its reference count when the handle is +/// destroyed or re-used for another entry. +class LibCacheEntryHandle { + public: + LibCacheEntryHandle() {} + ~LibCacheEntryHandle(); + + private: + friend class LibCache; + + LibCacheEntry* entry() const { return entry_; } + void SetEntry(LibCacheEntry* entry) { + if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry); + entry_ = entry; + } + + LibCacheEntry* entry_ = nullptr; + + DISALLOW_COPY_AND_ASSIGN(LibCacheEntryHandle); +}; + } #endif http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/service/fe-support.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index 35b5a15..0a84d09 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -294,9 +294,11 @@ static void ResolveSymbolLookup(const TSymbolLookupParams params, if (params.fn_binary_type != TFunctionBinaryType::BUILTIN) { // Refresh the library if necessary since we're creating a new function LibCache::instance()->SetNeedsRefresh(params.location); + LibCacheEntryHandle handle; string dummy_local_path; - Status status = LibCache::instance()->GetLocalLibPath( - params.location, type, &dummy_local_path); + Status status = LibCache::instance()->GetLocalPath( + params.location, type, &handle, &dummy_local_path); + if (!status.ok()) { result->__set_result_code(TSymbolLookupResultCode::BINARY_NOT_FOUND); result->__set_error_msg(status.GetDetail()); @@ -387,9 +389,10 @@ Java_org_apache_impala_service_FeSupport_NativeCacheJar( JniUtil::internal_exc_class(), nullptr); TCacheJarResult result; + LibCacheEntryHandle handle; string local_path; - Status status = LibCache::instance()->GetLocalLibPath(params.hdfs_location, - LibCache::TYPE_JAR, &local_path); + Status status = LibCache::instance()->GetLocalPath( + params.hdfs_location, LibCache::TYPE_JAR, &handle, &local_path); status.ToThrift(&result.status); if (status.ok()) result.__set_local_path(local_path); http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/tests/query_test/test_udfs.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py index 1ff716a..61dd54c 100644 --- a/tests/query_test/test_udfs.py +++ b/tests/query_test/test_udfs.py @@ -18,6 +18,9 @@ from copy import copy import os import pytest +import random +import threading +import time from subprocess import check_call from tests.beeswax.impala_beeswax import ImpalaBeeswaxException @@ -316,8 +319,6 @@ class TestUdfExecution(TestUdfBase): self.run_test_case('QueryTest/udf-non-deterministic', vector, use_db=unique_database) - # Runs serially as a temporary workaround for IMPALA_6092. - @pytest.mark.execute_serially def test_java_udfs(self, vector, unique_database): self.run_test_case('QueryTest/load-java-udfs', vector, use_db=unique_database) self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database) @@ -418,9 +419,6 @@ class TestUdfTargeted(TestUdfBase): unique_database, tgt_udf_path)) query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database) - # Dropping the function can interact with other tests whose Java classes are in - # the same jar. Use a copy of the jar to avoid unintended interactions. - # See IMPALA-6215 and IMPALA-6092 for examples. check_call(["hadoop", "fs", "-put", "-f", src_udf_path, tgt_udf_path]) self.client.execute(drop_fn_stmt) self.client.execute(create_fn_stmt) @@ -429,6 +427,86 @@ class TestUdfTargeted(TestUdfBase): assert "Unable to find class" in str(ex) self.client.execute(drop_fn_stmt) + def test_concurrent_jar_drop_use(self, vector, unique_database): + """IMPALA-6215: race between dropping/using java udf's defined in the same jar. + This test runs concurrent drop/use threads that result in class not found + exceptions when the race is present. + """ + udf_src_path = os.path.join( + os.environ['IMPALA_HOME'], "testdata/udfs/impala-hive-udfs.jar") + udf_tgt_path = get_fs_path( + '/test-warehouse/impala-hive-udfs-{0}.jar'.format(unique_database)) + + create_fn_to_drop = """create function {0}.foo_{1}() returns string + LOCATION '{2}' SYMBOL='org.apache.impala.TestUpdateUdf'""" + create_fn_to_use = """create function {0}.use_it(string) returns string + LOCATION '{1}' SYMBOL='org.apache.impala.TestUdf'""" + drop_fn = "drop function if exists {0}.foo_{1}()" + use_fn = """select * from (select max(int_col) from functional.alltypesagg + where {0}.use_it(string_col) = 'blah' union all + (select max(int_col) from functional.alltypesagg + where {0}.use_it(String_col) > '1' union all + (select max(int_col) from functional.alltypesagg + where {0}.use_it(string_col) > '1'))) v""" + num_drops = 100 + num_uses = 100 + + # use a unique jar for this test to avoid interactions with other tests + # that use the same jar + check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path]) + + # create all the functions. + setup_client = self.create_impala_client() + try: + s = create_fn_to_use.format(unique_database, udf_tgt_path) + print "use create: " + s + setup_client.execute(s) + except Exception as e: + print e + assert False + for i in range(0, num_drops): + try: + setup_client.execute(create_fn_to_drop.format(unique_database, i, udf_tgt_path)) + except Exception as e: + print e + assert False + + errors = [] + def use_fn_method(): + time.sleep(5 + random.random()) + client = self.create_impala_client() + try: + client.execute(use_fn.format(unique_database)) + except Exception as e: errors.append(e) + + def drop_fn_method(i): + time.sleep(1 + random.random()) + client = self.create_impala_client() + try: + client.execute(drop_fn.format(unique_database, i)) + except Exception as e: errors.append(e) + + # create threads to use functions. + runner_threads = [] + for i in range(0, num_uses): + runner_threads.append(threading.Thread(target=use_fn_method)) + + # create threads to drop functions. + drop_threads = [] + for i in range(0, num_drops): + runner_threads.append(threading.Thread(target=drop_fn_method, args=(i, ))) + + # launch all runner threads. + for t in runner_threads: t.start() + + # join all threads. + for t in runner_threads: t.join(); + + # Check for any errors. + for e in errors: print e + assert len(errors) == 0 + + @SkipIfLocal.multiple_impalad def test_hive_udfs_missing_jar(self, vector, unique_database): """ IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present