Return-Path: X-Original-To: apmail-hive-dev-archive@www.apache.org Delivered-To: apmail-hive-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3618E17D70 for ; Tue, 14 Oct 2014 17:40:35 +0000 (UTC) Received: (qmail 2238 invoked by uid 500); 14 Oct 2014 17:40:34 -0000 Delivered-To: apmail-hive-dev-archive@hive.apache.org Received: (qmail 2172 invoked by uid 500); 14 Oct 2014 17:40:34 -0000 Mailing-List: contact dev-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hive.apache.org Delivered-To: mailing list dev@hive.apache.org Received: (qmail 2161 invoked by uid 500); 14 Oct 2014 17:40:34 -0000 Delivered-To: apmail-hadoop-hive-dev@hadoop.apache.org Received: (qmail 2158 invoked by uid 99); 14 Oct 2014 17:40:34 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Oct 2014 17:40:34 +0000 Date: Tue, 14 Oct 2014 17:40:34 +0000 (UTC) From: "Chao (JIRA)" To: hive-dev@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (HIVE-8457) MapOperator initialization when multiple Spark threads is enabled. [Spark Branch] MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/HIVE-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chao updated HIVE-8457: ----------------------- Description: Currently, on the Spark branch, each thread it is bound with a thread-local IOContext, which gets initialized when we generates an input {{HadoopRDD}}, and later used in {{MapOperator}}, {{FilterOperator}}, etc. And, given the introduction of HIVE-8118, we may have multiple downstream RDDs that share the same input {{HadoopRDD}}, and we would like to have the {{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. A typical case would be like the following: {noformat} inputRDD inputRDD | | MT_11 MT_12 | | RT_1 RT_2 {noformat} Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}}, and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and {{ReduceTran}}. When multiple Spark threads are running, {{MT_11}} may be executed first, and it will ask for an iterator from the {{HadoopRDD}} will trigger the creation of the iterator, which in turn triggers the initialization of the {{IOContext}} associated with that particular thread. *Now, the problem is*: before {{MT_12}} starts executing, it will also ask for an iterator from the {{HadoopRDD}}, and since the RDD is already cached, instead of creating a new iterator, it will just fetch it from the cached result. However, this will skip the initialization of the IOContext associated with this particular thread. And, when {{MT_12}} starts executing, it will try to initialize the {{MapOperator}}, but since the {{IOContext}} is not initialized, this will fail miserably. was: Currently, on the Spark branch, each thread it is bound with a thread-local IOContext, which gets initialized when we generates an input {{HadoopRDD}}, and later used in {{MapOperator}}, {{FilterOperator}}, etc. And, given the introduction of HIVE-8118, we may have multiple downstream RDDs that share the same input {{HadoopRDD}}, and we would like to have the {{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. A typical case would be like the following: {noformat} inputRDD inputRDD | | MT_11 MT_12 | | RT_1 RT_2 {noformat} Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}}, and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and {{ReduceTran}}. When multiple Spark threads are running, {{MT_11}} may be executed first, and it will ask for an iterator from the {{HadoopRDD}} will trigger the creation of the iterator, which in turn triggers the initialization of the {{IOContext}} associated with that particular thread. Now, before {{MT_12}} starts executing, it will also ask for an iterator from the {{HadoopRDD}}, and since the RDD is already cached, instead of creating a new iterator, it will just fetch it from the cached result. However, the problem is, this will skip the initialization of the IOContext associated with this particular thread. When {{MT_12}} starts executing, it will first initialize the {{MapOperator}}, but since the {{IOContext}} is not initialized, this will fail miserably. > MapOperator initialization when multiple Spark threads is enabled. [Spark Branch] > --------------------------------------------------------------------------------- > > Key: HIVE-8457 > URL: https://issues.apache.org/jira/browse/HIVE-8457 > Project: Hive > Issue Type: Bug > Components: Spark > Reporter: Chao > > Currently, on the Spark branch, each thread it is bound with a thread-local IOContext, which gets initialized when we generates an input {{HadoopRDD}}, and later used in {{MapOperator}}, {{FilterOperator}}, etc. > And, given the introduction of HIVE-8118, we may have multiple downstream RDDs that share the same input {{HadoopRDD}}, and we would like to have the {{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. A typical case would be like the following: > {noformat} > inputRDD inputRDD > | | > MT_11 MT_12 > | | > RT_1 RT_2 > {noformat} > Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}}, > and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and {{ReduceTran}}. > When multiple Spark threads are running, {{MT_11}} may be executed first, and it will ask for an iterator from the {{HadoopRDD}} will trigger the creation of the iterator, which in turn triggers the initialization of the {{IOContext}} associated with that particular thread. > *Now, the problem is*: before {{MT_12}} starts executing, it will also ask for an iterator from the > {{HadoopRDD}}, and since the RDD is already cached, instead of creating a new iterator, it will just fetch it from the cached result. However, this will skip the initialization of the IOContext associated with this particular thread. And, when {{MT_12}} starts executing, it will try to initialize the {{MapOperator}}, but since the {{IOContext}} is not initialized, this will fail miserably. -- This message was sent by Atlassian JIRA (v6.3.4#6332)