Return-Path: X-Original-To: apmail-hive-user-archive@www.apache.org Delivered-To: apmail-hive-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6B95911CA8 for ; Thu, 15 May 2014 00:11:48 +0000 (UTC) Received: (qmail 45401 invoked by uid 500); 10 May 2014 23:14:50 -0000 Delivered-To: apmail-hive-user-archive@hive.apache.org Received: (qmail 15312 invoked by uid 500); 10 May 2014 22:57:24 -0000 Mailing-List: contact user-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hive.apache.org Delivered-To: mailing list user@hive.apache.org Received: (qmail 83067 invoked by uid 99); 10 May 2014 22:55:16 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 May 2014 22:55:16 +0000 X-ASF-Spam-Status: No, hits=2.4 required=5.0 tests=FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_NONE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of wzc1989@gmail.com designates 209.85.192.50 as permitted sender) Received: from [209.85.192.50] (HELO mail-qg0-f50.google.com) (209.85.192.50) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 May 2014 06:12:43 +0000 Received: by mail-qg0-f50.google.com with SMTP id z60so4003661qgd.9 for ; Thu, 08 May 2014 23:12:23 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:date:message-id:subject:from:to:content-type; bh=CHXhSDvYp7rjVDHHzM3uEMOEG4Ndc68ZJk2YHsKnFMY=; b=T+Kojtq9a9TFS17/Lk8RFfzljOMxdAMnyboVoGtjc4rqOx2GiYpOmbHZhZkbAl+Edq R1vnoJR+C44IzZ3FAqounNa88sbYvY08ZItEGxTQzVsQ8oBcJ+i1OWPIKbfEk7su9xb4 nQBATb57b9T+YXg4anIKlsGzAOTFN7tF7hWwUFlVXv4OrlOgW2RlNCla6YLxqNHEvR5G N4n2J/ckX9//ap/1eHI+WkjHwVOvlkhQwBPUyj+yKTvWaQ2qpFu0GDZOU/SVTGfiJGZe P+Xh0nnF1imQRYTlZucRCCwmv/3jDk1nCyEy5MK2B/Ccnpea9SdU4bi1MoKIPV+A14z8 rN2Q== MIME-Version: 1.0 X-Received: by 10.140.27.245 with SMTP id 108mr10924071qgx.18.1399615943166; Thu, 08 May 2014 23:12:23 -0700 (PDT) Received: by 10.96.20.226 with HTTP; Thu, 8 May 2014 23:12:23 -0700 (PDT) Date: Fri, 9 May 2014 14:12:23 +0800 Message-ID: Subject: OutOfMemoryError during reduce shuffle while reading Orc file From: wzc To: user@hive.apache.org Content-Type: multipart/alternative; boundary=001a11c152601d6bd804f8f17bc7 X-Virus-Checked: Checked by ClamAV on apache.org --001a11c152601d6bd804f8f17bc7 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Recently we are changing some data warehouse tables from textfile to orc format. Some of our hive SQL which read these orc tables failed at reduce stage. Reducers failed while copying Map outputs with following exception: Caused by: java.lang.OutOfMemoryError: Java heap space > at > org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArray= OutputStream.java:56) > at > org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArray= OutputStream.java:46) > at > org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.(InMemory= MapOutput.java:63) > at > org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalRes= erve(MergeManagerImpl.java:297) > at > org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeMan= agerImpl.java:287) > at > org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.jav= a:411) > at > org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java= :341) > at > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165) The query strings of affected SQL are in the below form: set hive.exec.dynamic.partition.mode=3Dnonstrict; > set hive.exec.dynamic.partition=3Dtrue; > set hive.exec.max.dynamic.partitions=3D10000; > insert overwrite table xxx partition(`day`) > select > count(distinct col1) c1, > count(distinct col2) c2, > ... > count(distinct col11) col11 > from t > group by col12, col13 Here we consider one specific job with 24G totalInputFileSize (orc compressed), it launches 97 maps (mapred.max.split.size is 256M) and 30 reduces(hive.exec.reducers.bytes.per.reducer =3D 1G). Since there are so many distinct, the total reduce shuffle bytes increase to 59G (lzo compressed, around 550G after decompressed). The average map output bytes each reducer fetch will be 550 * 1024 / 97 / 30 =3D 193M. Here= I notice two default params which control the memory usage of shuffling process: mapreduce.reduce.shuffle.input.buffer.percent =3D 0.9 > mapreduce.reduce.shuffle.memory.limit.percent =3D 0.25 the memoryLimit and maxSingleShuffleLimit is as below: > memoryLimit =3D total_memory * $mapreduce.reduce.shuffle.input.buffer.per= cent > maxSingleShuffleLimit =3D memoryLimit * > $mapreduce.reduce.shuffle.memory.limit.percent Here maxSingleShuffleLimit is the threshold for shuffling map output to memory. >From the log we can find all the runtime params: 2014-05-04 16:39:27,129 INFO [main] > org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl: MergerManager: > memoryLimit=3D1336252800, maxSingleShuffleLimit=3D334063200, > mergeThreshold=3D881926912, ioSortFactor=3D10, memToMemMergeOutputsThresh= old=3D10 Consider that the used memory is near memoryLimit and we shuffle another map output to memory, the total memory used may exceed under this configuration=EF=BC=9A total_memory_used =3D memoryLimit + maxSingleShuffleLimit =3D total_memory * input_buffer_percent * (1 + memory_limit_percent) =3D total_memory * 0.9 * 1.25 =3D total_memory * 1.125 When I set mapreduce.reduce.shuffle.input.buffer.percent to 0.6 the job runs well. Here are my questions: 1. Are the default settings for shuffling suitable? Or do I miss something? 2. Though the job use less maps and reduces after we compress data with orc format, but it runs slower than before. When I increase the reduce numbers it use less time. I wonder maybe we can improve the algorithm of estimateNumberOfReducers and take input data format into consideration? Any help is appreciated. --001a11c152601d6bd804f8f17bc7 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Recently = we are changing some data warehouse tables from textfile to orc format. Som= e of our hive SQL which read these orc tables failed at reduce stage.=C2=A0= Reducers failed while copying Map outputs with following exception:=

Caused by: java.lang.OutOfMemoryError: Java heap space
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.io.Bound= edByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56)<= br>=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.io.Bo= undedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:4= 6)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.mapreduc= e.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63)
=
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.mapreduc= e.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:2= 97)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.mapreduc= e.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287)
=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.mapreduce.task= .reduce.Fetcher.copyMapOutput(Fetcher.java:411)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.mapreduc= e.task.reduce.Fetcher.copyFromHost(Fetcher.java:341)
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.mapreduce.task.reduce.Fetc= her.run(Fetcher.java:165)


=
The query strings of affected SQL are in the below form:

set hive.exec.dynamic.partition.mode=3Dnonstrict;
set hive.exec.dynamic.partition=3Dtrue;
set hive.exec.max.dynamic.= partitions=3D10000;
insert overwrite table xxx partition(`day`)
select=C2=A0=C2=A0=C2= =A0=C2=A0
=C2=A0=C2=A0=C2=A0 count(distinct col1) c1,=C2=A0=C2=A0=C2= =A0
=C2=A0=C2=A0=C2=A0 count(distinct col2) c2,
=C2=A0=C2=A0=C2=A0 ...=
=C2=A0=C2=A0=C2=A0 count(distinct col11) col11
from t
group by col12, col13


Here= we consider one specific job with 24G totalInputFileSize (orc compressed),= =C2=A0 it launches 97 maps (mapred.max.split.size is 256M) and 30 r= educes(
hive.exec.reducers.bytes.per.reducer =3D 1G).

Sinc= e there are so many distinct, the total reduce shuffle bytes increase to 59= G (lzo compressed, around 550G after decompressed). The average map output = bytes each reducer fetch will be 550 * 1024 / 97 / 30 =3D 193M. Here I noti= ce two default params which control the memory usage of shuffling process:<= /span>

mapredu= ce.reduce.shuffle.input.buffer.percent=C2=A0 =3D 0.9
mapreduce.reduce.= shuffle.memory.limit.percent=C2=A0 =3D 0.25
=C2= =A0

the = memoryLimit and=C2=A0maxSingleShuffleLimit is as below:
memoryL= imit =3D total_memory * $mapreduce.reduce.shuffle.input.buffer.percent=C2=A0
= maxSingleShuffleLimit = =3D memoryLimit * $mapreduce.reduce.shuffle.memory.limit.percent
Here maxSingleShuffleLim= it is the threshold for shuffling map output to memory.

From= the log we can find all the runtime params:

2014-05= -04 16:39:27,129 INFO [main] org.apache.hadoop.mapreduce.task.reduce.MergeM= anagerImpl: MergerManager: memoryLimit=3D1336252800, maxSingleShuffleLimit= =3D334063200, mergeThreshold=3D881926912, ioSortFactor=3D10, memToMemMergeO= utputsThreshold=3D10


Consider that the used m= emory is near memoryLimit and we shuffle another map output to memory, the = total memory used may exceed under this configuration=EF=BC=9A
total_memory_used =3D=C2= =A0memoryLimit + ma= xSingleShuffleLimit=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=3D =C2=A0=C2=A0total_memory * inp= ut_buffer_percent * (1 + memory_limit_percent)
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=3D =C2=A0 total_memory * 0.9 * 1.= 25 =3D total_memory * 1.125=C2=A0

When I set mapreduce.reduc= e.shuffle.input.buffer.percent to 0.6 the job runs well.


Here are my questions:
1. Are the default setti= ngs for shuffling suitable? Or do I miss something?
2. Though the job use le= ss maps and reduces after we compress data with orc format, but it runs slo= wer than before. When I increase the reduce numbers it use less time. I won= der maybe we can improve the algorithm of estimateNumberOfReducers and take= input data format into consideration?=C2=A0

Any help is appreciated.
--001a11c152601d6bd804f8f17bc7--