impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From he...@apache.org
Subject [5/5] incubator-impala git commit: IMPALA-4432: Handle internal codegen disabling properly
Date Tue, 22 Nov 2016 18:08:35 GMT
IMPALA-4432: Handle internal codegen disabling properly

There are some conditions in which codegen is disabled internally
even if it's enabled in the query option. For instance, the single
node optimization or the expression evaluation requests sent from
the FE to the BE. These internal disabling of codegen are advisory
as their purposes are to reduce the latency for tables with no or
very few rows. The internal disabling of codegen doesn't interact
well with UDFs which cannot be interpreted (e.g. IR UDF) as it
conflates the 'disable_codegen' query option set by the user.
As a result, it's hard to differentiate between when codegen is
disabled explicitly by users and when it is disabled internally.

This change fixes the problem above by adding an explicit flag in
TQueryCtx to indicate that codegen is disabled internally. This flag
is only advisory. For cases in which codegen is needed to function,
this internal flag is ignored and if codegen is disabled via query
option, an error is thrown. For this new flag to work with ScalarFnCall,
codegen needs to happen after ScalarFnCall::Prepare() because it's
hard to tell if a fragment contains any UDF that cannot be interpreted
until after ScalarFnCall::Prepare() is called. However, Prepare() needs
the codegen object to codegen so it needs to be created before Prepare().
We can either always create the codegen module or defer codegen to a point
after ScalarFnCall::Prepare(). The former has the downside of introducing
unnecessary latency for say single-node optimization so the latter is
implemented. It is needed as part of IMPALA-4192 any way.

After this change, ScalarFnCall expressions which need to be codegen'd
are inserted into a vector in RuntimeState in ScalarFnCall::Prepare().
Later in the codegen phase, these expressions' GetCodegendComputeFn()
will be called after codegen for operators is done. If any of these
expressions are already codegen'd indirectly by the operators,
GetCodegendComputeFn() will be a no-op. This preserves the behavior
that ScalarFnCall will always be codegen'd even if the fragment
doesn't contain any codegen enabled operators.

Change-Id: I0b6a9ed723c64ba21b861608583cc9b6607d3397
Reviewed-on: http://gerrit.cloudera.org:8080/5105
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b7eeb8bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b7eeb8bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b7eeb8bf

Branch: refs/heads/master
Commit: b7eeb8bf85ef24b730c9ba2891f5a8075ba9605e
Parents: ac44d6c
Author: Michael Ho <kwho@cloudera.com>
Authored: Thu Nov 10 22:39:45 2016 -0800
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Tue Nov 22 14:56:03 2016 +0000

----------------------------------------------------------------------
 be/src/exec/aggregation-node.cc                 |  6 +-
 be/src/exec/exchange-node.cc                    |  6 +-
 be/src/exec/exec-node.cc                        | 10 +-
 be/src/exec/exec-node.h                         |  3 +
 be/src/exec/hash-join-node.cc                   |  6 +-
 be/src/exec/hdfs-avro-scanner.cc                |  2 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  2 +-
 be/src/exec/hdfs-scan-node-base.cc              |  4 +-
 be/src/exec/hdfs-sequence-scanner.cc            |  2 +-
 be/src/exec/hdfs-text-scanner.cc                |  2 +-
 be/src/exec/partitioned-aggregation-node.cc     |  6 +-
 be/src/exec/partitioned-hash-join-builder.cc    |  4 +-
 be/src/exec/partitioned-hash-join-node.cc       |  6 +-
 be/src/exec/sort-node.cc                        |  6 +-
 be/src/exec/topn-node.cc                        |  8 +-
 be/src/exprs/scalar-fn-call.cc                  | 96 +++++++++++---------
 be/src/exprs/scalar-fn-call.h                   | 10 +-
 be/src/runtime/plan-fragment-executor.cc        | 20 ++--
 be/src/runtime/runtime-state.cc                 |  9 ++
 be/src/runtime/runtime-state.h                  | 52 ++++++++++-
 be/src/service/fe-support.cc                    | 26 ++----
 common/thrift/ImpalaInternalService.thrift      |  5 +
 .../java/org/apache/impala/planner/Planner.java |  2 +-
 tests/query_test/test_udfs.py                   |  4 +-
 24 files changed, 180 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index a5c7490..f3b054e 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -158,14 +158,12 @@ Status AggregationNode::Prepare(RuntimeState* state) {
       vector<ExprContext*>(), 1, true,  vector<bool>(build_expr_ctxs_.size(),
true), id(),
       mem_tracker(), vector<RuntimeFilter*>(), true));
 
-  if (!state->codegen_enabled()) {
-    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
-  }
+  AddCodegenDisabledMessage(state);
   return Status::OK();
 }
 
 void AggregationNode::Codegen(RuntimeState* state) {
-  DCHECK(state->codegen_enabled());
+  DCHECK(state->ShouldCodegen());
   bool codegen_enabled = false;
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != NULL);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 13522a7..2b2b033 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -85,15 +85,13 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
     AddExprCtxsToFree(sort_exec_exprs_);
     less_than_.reset(
         new TupleRowComparator(sort_exec_exprs_, is_asc_order_, nulls_first_));
-    if (!state->codegen_enabled()) {
-      runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
-    }
+    AddCodegenDisabledMessage(state);
   }
   return Status::OK();
 }
 
 void ExchangeNode::Codegen(RuntimeState* state) {
-  DCHECK(state->codegen_enabled());
+  DCHECK(state->ShouldCodegen());
   if (is_merging_) {
     Status codegen_status = less_than_->Codegen(state);
     runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 1d93626..95306ec 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -165,7 +165,7 @@ Status ExecNode::Prepare(RuntimeState* state) {
 }
 
 void ExecNode::Codegen(RuntimeState* state) {
-  DCHECK(state->codegen_enabled());
+  DCHECK(state->ShouldCodegen());
   DCHECK(state->codegen() != NULL);
   for (int i = 0; i < children_.size(); ++i) {
     children_[i]->Codegen(state);
@@ -457,6 +457,14 @@ void ExecNode::AddExprCtxsToFree(const SortExecExprs& sort_exec_exprs)
{
   AddExprCtxsToFree(sort_exec_exprs.rhs_ordering_expr_ctxs());
 }
 
+void ExecNode::AddCodegenDisabledMessage(RuntimeState* state) {
+  if (state->CodegenDisabledByQueryOption()) {
+    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
+  } else if (state->CodegenDisabledByHint()) {
+    runtime_profile()->AddCodegenMsg(false, "disabled due to optimization hints");
+  }
+}
+
 // Codegen for EvalConjuncts.  The generated signature is
 // For a node with two conjunct predicates
 // define i1 @EvalConjuncts(%"class.impala::ExprContext"** %ctxs, i32 %num_ctxs,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index d5ca5e3..1dd2d32 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -184,6 +184,9 @@ class ExecNode {
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
   MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); }
 
+  /// Add codegen disabled message if codegen is disabled for this ExecNode.
+  void AddCodegenDisabledMessage(RuntimeState* state);
+
   /// Extract node id from p->name().
   static int GetNodeIdFromProfile(RuntimeProfile* p);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 7c335be..a18f356 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -143,14 +143,12 @@ Status HashJoinNode::Prepare(RuntimeState* state) {
           child(1)->row_desc().tuple_descriptors().size(), stores_nulls,
           is_not_distinct_from_, state->fragment_hash_seed(), mem_tracker(), filters_));
   build_pool_.reset(new MemPool(mem_tracker()));
-  if (!state->codegen_enabled()) {
-    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
-  }
+  AddCodegenDisabledMessage(state);
   return Status::OK();
 }
 
 void HashJoinNode::Codegen(RuntimeState* state) {
-  DCHECK(state->codegen_enabled());
+  DCHECK(state->ShouldCodegen());
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != NULL);
   bool build_codegen_enabled = false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index c217b65..6c737a4 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -80,7 +80,7 @@ Status HdfsAvroScanner::Open(ScannerContext* context) {
 Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** decode_avro_data_fn)
{
   *decode_avro_data_fn = NULL;
-  DCHECK(node->runtime_state()->codegen_enabled());
+  DCHECK(node->runtime_state()->ShouldCodegen());
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
   DCHECK(codegen != NULL);
   Function* materialize_tuple_fn = NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 6b157aa..c8f3ad6 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -635,7 +635,7 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
 
 Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** process_scratch_batch_fn)
{
-  DCHECK(node->runtime_state()->codegen_enabled());
+  DCHECK(node->runtime_state()->ShouldCodegen());
   *process_scratch_batch_fn = NULL;
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
   DCHECK(codegen != NULL);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index d31c6d1..e616faf 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -292,9 +292,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
   PrintHdfsSplitStats(per_volume_stats, &str);
   runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
-  if (!state->codegen_enabled()) {
-    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
-  }
+  AddCodegenDisabledMessage(state);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index c9f4319..d802bd7 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -55,7 +55,7 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
 Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn)
{
   *write_aligned_tuples_fn = NULL;
-  DCHECK(node->runtime_state()->codegen_enabled());
+  DCHECK(node->runtime_state()->ShouldCodegen());
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
   DCHECK(codegen != NULL);
   Function* write_complete_tuple_fn;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 6757815..07281df 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -696,7 +696,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter)
{
 Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn)
{
   *write_aligned_tuples_fn = NULL;
-  DCHECK(node->runtime_state()->codegen_enabled());
+  DCHECK(node->runtime_state()->ShouldCodegen());
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
   DCHECK(codegen != NULL);
   Function* write_complete_tuple_fn;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 56db26f..01622b9 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -275,14 +275,12 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
     }
     DCHECK(serialize_stream_->has_write_block());
   }
-  if (!state->codegen_enabled()) {
-    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
-  }
+  AddCodegenDisabledMessage(state);
   return Status::OK();
 }
 
 void PartitionedAggregationNode::Codegen(RuntimeState* state) {
-  DCHECK(state->codegen_enabled());
+  DCHECK(state->ShouldCodegen());
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != NULL);
   TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index e577b22..cfb6e42 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -141,8 +141,10 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker)
   partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime");
   build_hash_table_timer_ = ADD_TIMER(profile(), "HashTablesBuildTime");
   repartition_timer_ = ADD_TIMER(profile(), "RepartitionTime");
-  if (!state->codegen_enabled()) {
+  if (state->CodegenDisabledByQueryOption()) {
     profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
+  } else if (state->CodegenDisabledByHint()) {
+    profile()->AddCodegenMsg(false, "disabled due to optimization hints");
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index c3211af..61b7eb7 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -135,14 +135,12 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
 
   num_probe_rows_partitioned_ =
       ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
-  if (!state->codegen_enabled()) {
-    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
-  }
+  AddCodegenDisabledMessage(state);
   return Status::OK();
 }
 
 void PartitionedHashJoinNode::Codegen(RuntimeState* state) {
-  DCHECK(state->codegen_enabled());
+  DCHECK(state->ShouldCodegen());
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != NULL);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 140e662..7518223 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -54,14 +54,12 @@ Status SortNode::Prepare(RuntimeState* state) {
   sorter_.reset(new Sorter(*less_than_.get(), sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
       &row_descriptor_, mem_tracker(), runtime_profile(), state));
   RETURN_IF_ERROR(sorter_->Init());
-  if (!state->codegen_enabled()) {
-    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
-  }
+  AddCodegenDisabledMessage(state);
   return Status::OK();
 }
 
 void SortNode::Codegen(RuntimeState* state) {
-  DCHECK(state->codegen_enabled());
+  DCHECK(state->ShouldCodegen());
   Status codegen_status = less_than_->Codegen(state);
   runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
   ExecNode::Codegen(state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exec/topn-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index e72249f..bd4caec 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -78,16 +78,12 @@ Status TopNNode::Prepare(RuntimeState* state) {
           *tuple_row_less_than_));
   materialized_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
   insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime");
-  if (!state->codegen_enabled()) {
-    runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
-  }
-
-runtime_profile()->AddCodegenMsg(false);
+  AddCodegenDisabledMessage(state);
   return Status::OK();
 }
 
 void TopNNode::Codegen(RuntimeState* state) {
-  DCHECK(state->codegen_enabled());
+  DCHECK(state->ShouldCodegen());
   LlvmCodeGen* codegen = state->codegen();
   DCHECK(codegen != NULL);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exprs/scalar-fn-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc
index 5457ced..c80cfcb 100644
--- a/be/src/exprs/scalar-fn-call.cc
+++ b/be/src/exprs/scalar-fn-call.cc
@@ -68,6 +68,18 @@ ScalarFnCall::ScalarFnCall(const TExprNode& node)
   DCHECK_NE(fn_.binary_type, TFunctionBinaryType::JAVA);
 }
 
+Status ScalarFnCall::LoadPrepareAndCloseFn(LlvmCodeGen* codegen) {
+  if (fn_.scalar_fn.__isset.prepare_fn_symbol) {
+    RETURN_IF_ERROR(GetFunction(codegen, fn_.scalar_fn.prepare_fn_symbol,
+        reinterpret_cast<void**>(&prepare_fn_)));
+  }
+  if (fn_.scalar_fn.__isset.close_fn_symbol) {
+    RETURN_IF_ERROR(GetFunction(codegen, fn_.scalar_fn.close_fn_symbol,
+        reinterpret_cast<void**>(&close_fn_)));
+  }
+  return Status::OK();
+}
+
 Status ScalarFnCall::Prepare(RuntimeState* state, const RowDescriptor& desc,
     ExprContext* context) {
   RETURN_IF_ERROR(Expr::Prepare(state, desc, context));
@@ -95,26 +107,33 @@ Status ScalarFnCall::Prepare(RuntimeState* state, const RowDescriptor&
desc,
   fn_context_index_ =
       context->Register(state, return_type, arg_types, ComputeVarArgsBufferSize());
 
-  // Use the interpreted path and call the builtin without codegen if:
-  // 1. codegen is disabled or
+  // Use the interpreted path and call the builtin without codegen if any of the
+  // followings is true:
+  // 1. codegen is disabled by query option
   // 2. there are char arguments (as they aren't supported yet)
+  // 3. there is an optimization hint to disable codegen and UDF can be interpreted.
+  //    IR UDF or UDF with more than MAX_INTERP_ARGS number of fixed arguments
+  //    cannot be interpreted.
   //
   // TODO: codegen for char arguments
-  bool codegen_enabled = state->codegen_enabled();
-  if (!codegen_enabled || has_char_arg_or_result) {
-    if (fn_.binary_type == TFunctionBinaryType::IR) {
+  bool is_ir_udf = fn_.binary_type == TFunctionBinaryType::IR;
+  bool too_many_args_to_interp = NumFixedArgs() > MAX_INTERP_ARGS;
+  bool udf_interpretable = !is_ir_udf && !too_many_args_to_interp;
+  if (state->CodegenDisabledByQueryOption() || has_char_arg_or_result ||
+      (state->CodegenHasDisableHint() && udf_interpretable)) {
+    if (is_ir_udf) {
       // CHAR or VARCHAR are not supported as input arguments or return values for UDFs.
-      DCHECK(!has_char_arg_or_result && !codegen_enabled);
+      DCHECK(!has_char_arg_or_result && state->CodegenDisabledByQueryOption());
       return Status(Substitute("Cannot interpret LLVM IR UDF '$0': Codegen is needed. "
           "Please set DISABLE_CODEGEN to false.", fn_.name.function_name));
     }
 
     // The templates for builtin or native UDFs used in the interpretation path
-    // support up to 20 arguments only.
-    if (NumFixedArgs() > MAX_INTERP_ARGS) {
+    // support up to MAX_INTERP_ARGS number of arguments only.
+    if (too_many_args_to_interp) {
       DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::NATIVE);
       // CHAR or VARCHAR are not supported as input arguments or return values for UDFs.
-      DCHECK(!has_char_arg_or_result && !codegen_enabled);
+      DCHECK(!has_char_arg_or_result && state->CodegenDisabledByQueryOption());
       return Status(Substitute("Cannot interpret native UDF '$0': number of arguments is
"
           "more than $1. Codegen is needed. Please set DISABLE_CODEGEN to false.",
           fn_.name.function_name, MAX_INTERP_ARGS));
@@ -134,48 +153,26 @@ Status ScalarFnCall::Prepare(RuntimeState* state, const RowDescriptor&
desc,
       }
     }
   } else {
-    // If we got here, either codegen is enabled or we need codegen to run this function.
-    LlvmCodeGen* codegen = state->codegen();
-    DCHECK(codegen != NULL);
-
-    if (fn_.binary_type == TFunctionBinaryType::IR) {
-      string local_path;
-      RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-          fn_.hdfs_location, LibCache::TYPE_IR, &local_path));
-      // Link the UDF module into this query's main module (essentially copy the UDF
-      // module into the main module) so the UDF's functions are available in the main
-      // module.
-      RETURN_IF_ERROR(codegen->LinkModule(local_path));
-    }
-
-    Function* ir_udf_wrapper;
-    RETURN_IF_ERROR(GetCodegendComputeFn(codegen, &ir_udf_wrapper));
-    // TODO: don't do this for child exprs
-    codegen->AddFunctionToJit(ir_udf_wrapper, &scalar_fn_wrapper_);
-  }
-
-  if (fn_.scalar_fn.__isset.prepare_fn_symbol) {
-    RETURN_IF_ERROR(GetFunction(state, fn_.scalar_fn.prepare_fn_symbol,
-        reinterpret_cast<void**>(&prepare_fn_)));
-  }
-  if (fn_.scalar_fn.__isset.close_fn_symbol) {
-    RETURN_IF_ERROR(GetFunction(state, fn_.scalar_fn.close_fn_symbol,
-        reinterpret_cast<void**>(&close_fn_)));
+    // Add the expression to the list of expressions to codegen in the codegen phase.
+    state->AddScalarFnToCodegen(this);
   }
 
+  // For IR UDF, the loading of the Prepare() and Close() functions is deferred until
+  // first time GetCodegendComputeFn() is invoked.
+  if (!is_ir_udf) RETURN_IF_ERROR(LoadPrepareAndCloseFn(NULL));
   return Status::OK();
 }
 
 Status ScalarFnCall::Open(RuntimeState* state, ExprContext* ctx,
-                          FunctionContext::FunctionStateScope scope) {
+    FunctionContext::FunctionStateScope scope) {
   // Opens and inits children
   RETURN_IF_ERROR(Expr::Open(state, ctx, scope));
   FunctionContext* fn_ctx = ctx->fn_context(fn_context_index_);
 
-  if (scalar_fn_ != NULL) {
+  if (scalar_fn_wrapper_ == NULL) {
     // We're in the interpreted path (i.e. no JIT). Populate our FunctionContext's
     // staging_input_vals, which will be reused across calls to scalar_fn_.
-    DCHECK(scalar_fn_wrapper_ == NULL);
+    DCHECK(scalar_fn_ != NULL);
     vector<AnyVal*>* input_vals = fn_ctx->impl()->staging_input_vals();
     for (int i = 0; i < NumFixedArgs(); ++i) {
       AnyVal* input_val;
@@ -197,7 +194,7 @@ Status ScalarFnCall::Open(RuntimeState* state, ExprContext* ctx,
     fn_ctx->impl()->SetConstantArgs(move(constant_args));
   }
 
-  if (scalar_fn_ != NULL) {
+  if (scalar_fn_wrapper_ == NULL) {
     // Now we have the constant values, cache them so that the interpreted path can
     // call the UDF without reevaluating the arguments. 'staging_input_vals' and
     // 'varargs_buffer' in the FunctionContext are used to pass fixed and variable-length
@@ -307,6 +304,18 @@ Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* codegen, Function**
fn) {
     }
   }
 
+  if (fn_.binary_type == TFunctionBinaryType::IR) {
+    string local_path;
+    RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
+        fn_.hdfs_location, LibCache::TYPE_IR, &local_path));
+    // Link the UDF module into this query's main module (essentially copy the UDF
+    // module into the main module) so the UDF's functions are available in the main
+    // module.
+    RETURN_IF_ERROR(codegen->LinkModule(local_path));
+    // Load the Prepare() and Close() functions from the LLVM module.
+    RETURN_IF_ERROR(LoadPrepareAndCloseFn(codegen));
+  }
+
   Function* udf;
   RETURN_IF_ERROR(GetUdf(codegen, &udf));
 
@@ -407,6 +416,8 @@ Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* codegen, Function**
fn) {
         TErrorCode::UDF_VERIFY_FAILED, fn_.scalar_fn.symbol, fn_.hdfs_location);
   }
   ir_compute_fn_ = *fn;
+  // TODO: don't do this for child exprs
+  codegen->AddFunctionToJit(ir_compute_fn_, &scalar_fn_wrapper_);
   return Status::OK();
 }
 
@@ -518,14 +529,13 @@ Status ScalarFnCall::GetUdf(LlvmCodeGen* codegen, Function** udf) {
   return Status::OK();
 }
 
-Status ScalarFnCall::GetFunction(RuntimeState* state, const string& symbol, void** fn)
{
+Status ScalarFnCall::GetFunction(LlvmCodeGen* codegen, const string& symbol, void** fn)
{
   if (fn_.binary_type == TFunctionBinaryType::NATIVE ||
       fn_.binary_type == TFunctionBinaryType::BUILTIN) {
     return LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, symbol, fn,
-                                                  &cache_entry_);
+        &cache_entry_);
   } else {
     DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::IR);
-    LlvmCodeGen* codegen = state->codegen();
     DCHECK(codegen != NULL);
     Function* ir_fn = codegen->GetFunction(symbol, false);
     if (ir_fn == NULL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/exprs/scalar-fn-call.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-fn-call.h b/be/src/exprs/scalar-fn-call.h
index 85191d5..530f1b1 100644
--- a/be/src/exprs/scalar-fn-call.h
+++ b/be/src/exprs/scalar-fn-call.h
@@ -54,6 +54,7 @@ class ScalarFnCall: public Expr {
 
  protected:
   friend class Expr;
+  friend class RuntimeState;
 
   ScalarFnCall(const TExprNode& node);
   virtual Status Prepare(RuntimeState* state, const RowDescriptor& desc,
@@ -123,8 +124,13 @@ class ScalarFnCall: public Expr {
 
   /// Loads the native or IR function 'symbol' from HDFS and puts the result in *fn.
   /// If the function is loaded from an IR module, it cannot be called until the module
-  /// has been JIT'd (i.e. after Prepare() has completed).
-  Status GetFunction(RuntimeState* state, const std::string& symbol, void** fn);
+  /// has been JIT'd (i.e. after GetCodegendComputeFn() has been called).
+  Status GetFunction(LlvmCodeGen* codegen, const std::string& symbol, void** fn);
+
+  /// Loads the Prepare() and Close() functions for this ScalarFnCall. They could be
+  /// native or IR functions. To load IR functions, the codegen object must have
+  /// been created and any external LLVM module must have been linked already.
+  Status LoadPrepareAndCloseFn(LlvmCodeGen* codegen);
 
   /// Evaluates the non-constant children exprs. Used in the interpreted path.
   void EvaluateNonConstantChildren(ExprContext* context, const TupleRow* row);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 13ea662..9db4d38 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -207,14 +207,11 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams&
requ
     scan_node->SetScanRanges(scan_ranges);
   }
 
-  RuntimeState* state = runtime_state_.get();
+  RuntimeState* state = runtime_state();
   RuntimeProfile::Counter* prepare_timer =
       ADD_CHILD_TIMER(timings_profile_, "ExecTreePrepareTime", PREPARE_TIMER_NAME);
   {
     SCOPED_TIMER(prepare_timer);
-    // Until IMPALA-4233 is fixed, we still need to create the codegen object before
-    // Prepare() as ScalarFnCall::Prepare() may codegen.
-    if (state->codegen_enabled()) RETURN_IF_ERROR(state->CreateCodegen());
     RETURN_IF_ERROR(exec_tree_->Prepare(state));
   }
 
@@ -242,7 +239,14 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams&
requ
     ReleaseThreadToken();
   }
 
-  if (state->codegen_enabled()) exec_tree_->Codegen(state);
+  if (state->ShouldCodegen()) {
+    RETURN_IF_ERROR(state->CreateCodegen());
+    exec_tree_->Codegen(state);
+    // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
+    // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
+    // the error status for now.
+    RETURN_IF_ERROR(state->CodegenScalarFns());
+  }
 
   // set up profile counters
   profile()->AddChild(exec_tree_->runtime_profile());
@@ -251,14 +255,14 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams&
requ
   per_host_mem_usage_ =
       ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES);
 
-  row_batch_.reset(new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),
-      runtime_state_->instance_mem_tracker()));
+  row_batch_.reset(new RowBatch(exec_tree_->row_desc(), state->batch_size(),
+      state->instance_mem_tracker()));
   VLOG(2) << "plan_root=\n" << exec_tree_->DebugString();
   return Status::OK();
 }
 
 void PlanFragmentExecutor::OptimizeLlvmModule() {
-  if (!runtime_state_->codegen_enabled()) return;
+  if (!runtime_state_->ShouldCodegen()) return;
   LlvmCodeGen* codegen = runtime_state_->codegen();
   DCHECK(codegen != NULL);
   Status status = codegen->FinalizeModule();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 00ccea1..c2adcb7 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -30,6 +30,7 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "exprs/expr.h"
+#include "exprs/scalar-fn-call.h"
 #include "runtime/buffered-block-mgr.h"
 #include "runtime/descriptors.h"
 #include "runtime/data-stream-mgr.h"
@@ -190,6 +191,14 @@ Status RuntimeState::CreateCodegen() {
   return Status::OK();
 }
 
+Status RuntimeState::CodegenScalarFns() {
+  for (ScalarFnCall* scalar_fn : scalar_fns_to_codegen_) {
+    Function* fn;
+    RETURN_IF_ERROR(scalar_fn->GetCodegendComputeFn(codegen_.get(), &fn));
+  }
+  return Status::OK();
+}
+
 string RuntimeState::ErrorLog() {
   lock_guard<SpinLock> l(error_log_lock_);
   return PrintErrorMapToString(error_log_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 0d612db..0ada7e4 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -42,6 +42,7 @@ class LlvmCodeGen;
 class MemTracker;
 class ObjectPool;
 class RuntimeFilterBank;
+class ScalarFnCall;
 class Status;
 class TimestampValue;
 class TQueryOptions;
@@ -155,12 +156,48 @@ class RuntimeState {
   /// Returns runtime state profile
   RuntimeProfile* runtime_profile() { return &profile_; }
 
-  /// Returns true if codegen is enabled for this query.
-  bool codegen_enabled() const { return !query_options().disable_codegen; }
-
   /// Returns the LlvmCodeGen object for this fragment instance.
   LlvmCodeGen* codegen() { return codegen_.get(); }
 
+  /// Add ScalarFnCall expression 'udf' to be codegen'd later if it's not disabled by
+  /// query option. This is for cases in which the UDF cannot be interpreted or if the
+  /// plan fragment doesn't contain any codegen enabled operator.
+  void AddScalarFnToCodegen(ScalarFnCall* udf) { scalar_fns_to_codegen_.push_back(udf); }
+
+  /// Returns true if there are ScalarFnCall expressions in the fragments which can't be
+  /// interpreted. This should only be used after the Prepare() phase in which all
+  /// expressions' Prepare() are invoked.
+  bool ScalarFnNeedsCodegen() const { return !scalar_fns_to_codegen_.empty(); }
+
+  /// Returns true if there is a hint to disable codegen. This can be true for single node
+  /// optimization or expression evaluation request from FE to BE (see fe-support.cc).
+  /// Note that this internal flag is advisory and it may be ignored if the fragment has
+  /// any UDF which cannot be interpreted. See ScalarFnCall::Prepare() for details.
+  inline bool CodegenHasDisableHint() const {
+    return query_ctx().disable_codegen_hint;
+  }
+
+  /// Returns true iff there is a hint to disable codegen and all expressions in the
+  /// fragment can be interpreted. This should only be used after the Prepare() phase
+  /// in which all expressions' Prepare() are invoked.
+  inline bool CodegenDisabledByHint() const {
+    return CodegenHasDisableHint() && !ScalarFnNeedsCodegen();
+  }
+
+  /// Returns true if codegen is disabled by query option.
+  inline bool CodegenDisabledByQueryOption() const {
+    return query_options().disable_codegen;
+  }
+
+  /// Returns true if codegen should be enabled for this fragment. Codegen is enabled
+  /// if all the following conditions hold:
+  /// 1. it's enabled by query option
+  /// 2. it's not disabled by internal hints or there are expressions in the fragment
+  ///    which cannot be interpreted.
+  inline bool ShouldCodegen() const {
+    return !CodegenDisabledByQueryOption() && !CodegenDisabledByHint();
+  }
+
   /// Takes ownership of a scan node's reader context and plan fragment executor will call
   /// UnregisterReaderContexts() to unregister it when the fragment is closed. The IO
   /// buffers may still be in use and thus the deferred unregistration.
@@ -257,6 +294,12 @@ class RuntimeState {
   /// Create a codegen object accessible via codegen() if it doesn't exist already.
   Status CreateCodegen();
 
+  /// Codegen all ScalarFnCall expressions in 'scalar_fns_to_codegen_'. If codegen fails
+  /// for any expressions, return immediately with the error status. Once IMPALA-4233 is
+  /// fixed, it's not fatal to fail codegen if the expression can be interpreted.
+  /// TODO: Fix IMPALA-4233
+  Status CodegenScalarFns();
+
  private:
   /// Allow TestEnv to set block_mgr manually for testing.
   friend class TestEnv;
@@ -292,6 +335,9 @@ class RuntimeState {
   ExecEnv* exec_env_;
   boost::scoped_ptr<LlvmCodeGen> codegen_;
 
+  /// Contains all ScalarFnCall expressions which need to be codegen'd.
+  vector<ScalarFnCall*> scalar_fns_to_codegen_;
+
   /// Thread resource management object for this fragment's execution.  The runtime
   /// state is responsible for returning this pool to the thread mgr.
   ThreadResourceMgr::ResourcePool* resource_pool_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 7147964..52beda5 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -85,24 +85,9 @@ Java_org_apache_impala_service_FeSupport_NativeEvalConstExprs(
   DeserializeThriftMsg(env, thrift_expr_batch, &expr_batch);
   DeserializeThriftMsg(env, thrift_query_ctx_bytes, &query_ctx);
   vector<TExpr>& texprs = expr_batch.exprs;
-
-  // Codegen is almost always disabled in this path. The only exception is when the
-  // expression contains IR UDF which cannot be interpreted. Enable codegen in this
-  // case if codegen is not disabled in the query option. Otherwise, we will let it
-  // fail in ScalarFnCall::Prepare().
-  bool need_codegen = false;
-  for (const TExpr& texpr : texprs) {
-    if (Expr::NeedCodegen(texpr)) {
-      need_codegen = true;
-      break;
-    }
-  }
-  query_ctx.request.query_options.disable_codegen |= !need_codegen;
+  // Disable codegen advisorily to avoid unnecessary latency.
+  query_ctx.disable_codegen_hint = true;
   RuntimeState state(query_ctx);
-  if (!query_ctx.request.query_options.disable_codegen) {
-    THROW_IF_ERROR_RET(
-        state.CreateCodegen(), env, JniUtil::internal_exc_class(), result_bytes);
-  }
 
   THROW_IF_ERROR_RET(jni_frame.push(env), env, JniUtil::internal_exc_class(),
       result_bytes);
@@ -110,7 +95,6 @@ Java_org_apache_impala_service_FeSupport_NativeEvalConstExprs(
   // preparing/running the exprs.
   state.InitMemTrackers(TUniqueId(), NULL, -1);
 
-  // Prepare the exprs
   vector<ExprContext*> expr_ctxs;
   for (const TExpr& texpr : texprs) {
     ExprContext* ctx;
@@ -121,9 +105,13 @@ Java_org_apache_impala_service_FeSupport_NativeEvalConstExprs(
     expr_ctxs.push_back(ctx);
   }
 
-  if (!query_ctx.request.query_options.disable_codegen) {
+  // UDFs which cannot be interpreted need to be handled by codegen.
+  if (state.ScalarFnNeedsCodegen()) {
+    THROW_IF_ERROR_RET(
+        state.CreateCodegen(), env, JniUtil::internal_exc_class(), result_bytes);
     LlvmCodeGen* codegen = state.codegen();
     DCHECK(codegen != NULL);
+    state.CodegenScalarFns();
     codegen->EnableOptimizations(false);
     codegen->FinalizeModule();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 25af712..4f81e27 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -314,6 +314,11 @@ struct TQueryCtx {
 
   // Milliseconds since UNIX epoch at the start of query execution.
   13: required i64 start_unix_millis
+
+  // Hint to disable codegen. Set by planner for single-node optimization or by the
+  // backend in NativeEvalConstExprs() in FESupport. This flag is only advisory to
+  // avoid the overhead of codegen and can be ignored if codegen is needed functionally.
+  14: optional bool disable_codegen_hint = false;
 }
 
 // Context to collect information, which is shared among all instances of that plan

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 658fd0a..acfafb3 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -102,7 +102,7 @@ public class Planner {
     if (isSmallQuery) {
       // Execute on a single node and disable codegen for small results
       ctx_.getQueryOptions().setNum_nodes(1);
-      ctx_.getQueryOptions().setDisable_codegen(true);
+      ctx_.getQueryCtx().disable_codegen_hint = true;
       if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
           maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) {
         // Only one scanner thread for small queries

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b7eeb8bf/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 31c4b6e..d5a7cba 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -38,8 +38,8 @@ class TestUdfs(ImpalaTestSuite):
   def add_test_dimensions(cls):
     super(TestUdfs, cls).add_test_dimensions()
     cls.TestMatrix.add_dimension(
-      create_exec_option_dimension(disable_codegen_options=[False, True]))
-
+      create_exec_option_dimension(disable_codegen_options=[False, True],
+                                   exec_single_node_option=[0,100]))
     # There is no reason to run these tests using all dimensions.
     cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
 



Mime
View raw message