pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rohini Palaniswamy (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (PIG-5359) Reduce time spent in split serialization
Date Tue, 09 Oct 2018 02:45:00 GMT

     [ https://issues.apache.org/jira/browse/PIG-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Rohini Palaniswamy updated PIG-5359:
------------------------------------
       Resolution: Fixed
     Hadoop Flags: Reviewed
    Fix Version/s: 0.18.0
           Status: Resolved  (was: Patch Available)

+1 to PIG-5359-3.patch from review board. Uploaded it here and committed to trunk.

> Reduce time spent in split serialization
> ----------------------------------------
>
>                 Key: PIG-5359
>                 URL: https://issues.apache.org/jira/browse/PIG-5359
>             Project: Pig
>          Issue Type: Improvement
>            Reporter: Satish Subhashrao Saley
>            Assignee: Satish Subhashrao Saley
>            Priority: Major
>             Fix For: 0.18.0
>
>         Attachments: PIG-5359-3.patch
>
>
> 1. Unnecessary serialization of splits in Tez.
>  In LoaderProcessor, pig calls
>  [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java#L172]
> {code:java}
> tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf,
false, 0));
> {code}
> It ends up serializing the splits, just to print log.
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L317]
> {code:java}
>   public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf,
>       boolean groupSplits, boolean sortSplits, int targetTasks)
>       throws IOException, ClassNotFoundException, InterruptedException {
>       ....
>       ....
>           LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
>         + splitInfoMem.getSplitsProto().getSerializedSize());
>     return splitInfoMem;
> {code}
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L106]
> {code:java}
>   public MRSplitsProto getSplitsProto() {
>     if (isNewSplit) {
>       try {
>         return createSplitsProto(newFormatSplits, new SerializationFactory(conf));
> {code}
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L152-L170]
> {code:java}
>   private static MRSplitsProto createSplitsProto(
>       org.apache.hadoop.mapreduce.InputSplit[] newSplits,
>       SerializationFactory serializationFactory) throws IOException,
>       InterruptedException {
>     MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
>     for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
>       splitsBuilder.addSplits(MRInputHelpers.createSplitProto(newSplit, serializationFactory));
>     }
>     return splitsBuilder.build();
>   }
> {code}
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L221-L259]
> 2. In TezDagBuilder, if splitsSerializedSize > spillThreshold, then the InputSplits
serialized in MRSplitsProto are not used by Pig and it serializes again directly to disk via
JobSplitWriter.createSplitFiles. So the InputSplit serialization logic is called again which
is wasteful and expensive in cases like HCat.
> [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L946-L947]
> {code:java}
> MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
> int splitsSerializedSize = splitsProto.getSerializedSize();
> {code}
> The getSplitsProto, creates MRSplitsProto which consists of list of MRSplitProto. MRSplitProto
has serialized bytes of each InputFormat. If splitsSerializedSize > spillThreshold, pig
writes the splits to disk via
> {code:java}
> if(splitsSerializedSize > spillThreshold) {
>     inputPayLoad.setBoolean(
>             org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
>             false);
>     // Write splits to disk
>     Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
>     log.info("Writing input splits to " + inputSplitsDir
>             + " for vertex " + vertex.getName()
>             + " as the serialized size in memory is "
>             + splitsSerializedSize + ". Configured "
>             + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
>             + " is " + spillThreshold);
>     inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
>             (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, fs);
> {code}
> [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L960]
>  [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java#L302-L314]
> Solution:
>  1. Do not serialize the split in LoaderProcessor.java
>  2. In TezDagBuilder.java, serialize each input split and keep adding its size and if
it exceeds spillThreshold, then write the splits to disk reusing the serialized buffers for
each split.
>  
> Thank you [~rohini] for identifying the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message