pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Carlos Balduz (JIRA)" <j...@apache.org>
Subject [jira] [Created] (PIG-4323) PackageConverter hanging in Spark
Date Tue, 11 Nov 2014 17:03:34 GMT
Carlos Balduz created PIG-4323:
----------------------------------

             Summary: PackageConverter hanging in Spark
                 Key: PIG-4323
                 URL: https://issues.apache.org/jira/browse/PIG-4323
             Project: Pig
          Issue Type: Bug
          Components: spark
            Reporter: Carlos Balduz
            Assignee: Carlos Balduz


After the patch introduced in PIG-4237, PackageConverter hangs for large jobs, since it has
to deserialize JobConfiguration each time apply is called, making it way too slow:

{code:java}
public Tuple apply(final Tuple t) {
            initializeJobConf();
            ...
{code}

After having a job stuck for 30 minutes, I made a thread dump and saw that the main issue
was:

{code:java}
Thread 11128: (state = IN_JAVA)
 - java.util.zip.InflaterInputStream.read(byte[], int, int) @bci=53, line=152 (Compiled frame;
information may be imprecise)
 - java.util.zip.GZIPInputStream.read(byte[], int, int) @bci=17, line=116 (Compiled frame)
 - org.apache.hadoop.io.WritableUtils.readCompressedByteArray(java.io.DataInput) @bci=65,
line=44 (Compiled frame)
 - org.apache.hadoop.io.WritableUtils.readCompressedString(java.io.DataInput) @bci=1, line=87
(Compiled frame)
 - org.apache.hadoop.io.WritableUtils.readCompressedStringArray(java.io.DataInput) @bci=29,
line=185 (Compiled frame)
 - org.apache.hadoop.conf.Configuration.readFields(java.io.DataInput) @bci=37, line=2564 (Compiled
frame)
 - org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer.deserializeJobConf(byte[])
@bci=24, line=80 (Compiled frame)
 - org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.initializeJobConf()
@bci=4, line=60 (Compiled frame)
 - org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.apply(org.apache.pig.data.Tuple)
@bci=1, line=67 (Compiled frame)
 - org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter$PackageFunction.apply(java.lang.Object)
@bci=5, line=48 (Compiled frame)
 - scala.collection.Iterator$$anon$11.next() @bci=13, line=328 (Compiled frame)
 - scala.collection.convert.Wrappers$IteratorWrapper.next() @bci=4, line=30 (Compiled frame)
 - org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.readNext()
@bci=44, line=61 (Compiled frame)
 - org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.readNext()
@bci=162, line=78 (Compiled frame)
 - org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.hasNext()
@bci=1, line=91 (Compiled frame)
 - org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.readNext()
@bci=133, line=75 (Compiled frame)
 - org.apache.pig.backend.hadoop.executionengine.spark.converter.POOutputConsumerIterator.hasNext()
@bci=1, line=91 (Compiled frame)
 - scala.collection.convert.Wrappers$JIteratorWrapper.hasNext() @bci=4, line=41 (Compiled
frame)
 - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled frame)
 - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled frame)
 - scala.collection.Iterator$class.foreach(scala.collection.Iterator, scala.Function1) @bci=1,
line=727 (Compiled frame)
 - scala.collection.AbstractIterator.foreach(scala.Function1) @bci=2, line=1157 (Interpreted
frame)
 - org.apache.spark.shuffle.hash.HashShuffleWriter.write(scala.collection.Iterator) @bci=95,
line=65 (Interpreted frame)
 - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=170,
line=68 (Interpreted frame)
 - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2,
line=41 (Interpreted frame)
 - org.apache.spark.scheduler.Task.run(long) @bci=67, line=54 (Interpreted frame)
 - org.apache.spark.executor.Executor$TaskRunner.run() @bci=336, line=177 (Interpreted frame)
 - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
@bci=95, line=1145 (Interpreted frame)
 - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
{code}

I have added a condition that checks whether it has already been initialized or not, making
it work again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message