Return-Path: X-Original-To: apmail-pig-dev-archive@www.apache.org Delivered-To: apmail-pig-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 00F9C174E3 for ; Tue, 11 Nov 2014 17:03:35 +0000 (UTC) Received: (qmail 21657 invoked by uid 500); 11 Nov 2014 17:03:34 -0000 Delivered-To: apmail-pig-dev-archive@pig.apache.org Received: (qmail 21400 invoked by uid 500); 11 Nov 2014 17:03:34 -0000 Mailing-List: contact dev-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list dev@pig.apache.org Received: (qmail 21389 invoked by uid 500); 11 Nov 2014 17:03:34 -0000 Delivered-To: apmail-hadoop-pig-dev@hadoop.apache.org Received: (qmail 21386 invoked by uid 99); 11 Nov 2014 17:03:34 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2014 17:03:34 +0000 Date: Tue, 11 Nov 2014 17:03:34 +0000 (UTC) From: "Carlos Balduz (JIRA)" To: pig-dev@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (PIG-4323) PackageConverter hanging in Spark MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 Carlos Balduz created PIG-4323: ---------------------------------- Summary: PackageConverter hanging in Spark Key: PIG-4323 URL: https://issues.apache.org/jira/browse/PIG-4323 Project: Pig Issue Type: Bug Components: spark Reporter: Carlos Balduz Assignee: Carlos Balduz After the patch introduced in PIG-4237, PackageConverter hangs for large jobs, since it has to deserialize JobConfiguration each time apply is called, making it way too slow: {code:java} public Tuple apply(final Tuple t) { initializeJobConf(); ... {code} After having a job stuck for 30 minutes, I made a thread dump and saw that the main issue was: {code:java} Thread 11128: (state = IN_JAVA) - java.util.zip.InflaterInputStream.read(byte[], int, int) @bci=53, line=152 (Compiled frame; information may be imprecise) - java.util.zip.GZIPInputStream.read(byte[], int, int) @bci=17, line=116 (Compiled frame) - org.apache.hadoop.io.WritableUtils.readCompressedByteArray(java.io.DataInput) @bci=65, line=44 (Compiled frame) - org.apache.hadoop.io.WritableUtils.readCompressedString(java.io.DataInput) @bci=1, line=87 (Compiled frame) - org.apache.hadoop.io.WritableUtils.readCompressedStringArray(java.io.DataInput) @bci=29, line=185 (Compiled frame) - org.apache.hadoop.conf.Configuration.readFields(java.io.DataInput) @bci=37, line=2564 (Compiled frame) - org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer.deserializeJobConf(byte[]) @bci=24, line=80 (Compiled frame) - org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.initializeJobConf() @bci=4, line=60 (Compiled frame) - org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.apply(org.apache.pig.data.Tuple) @bci=1, line=67 (Compiled frame) - org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.apply(java.lang.Object) @bci=5, line=48 (Compiled frame) - scala.collection.Iterator$$anon$11.next() @bci=13, line=328 (Compiled frame) - scala.collection.convert.Wrappers$IteratorWrapper.next() @bci=4, line=30 (Compiled frame) - org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.readNext() @bci=44, line=61 (Compiled frame) - org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.readNext() @bci=162, line=78 (Compiled frame) - org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.hasNext() @bci=1, line=91 (Compiled frame) - org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.readNext() @bci=133, line=75 (Compiled frame) - org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.hasNext() @bci=1, line=91 (Compiled frame) - scala.collection.convert.Wrappers$JIteratorWrapper.hasNext() @bci=4, line=41 (Compiled frame) - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled frame) - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled frame) - scala.collection.Iterator$class.foreach(scala.collection.Iterator, scala.Function1) @bci=1, line=727 (Compiled frame) - scala.collection.AbstractIterator.foreach(scala.Function1) @bci=2, line=1157 (Interpreted frame) - org.apache.spark.shuffle.hash.HashShuffleWriter.write(scala.collection.Iterator) @bci=95, line=65 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=170, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=67, line=54 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=336, line=177 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame) {code} I have added a condition that checks whether it has already been initialized or not, making it work again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)