tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3877. Delete unordered spill files once merge is done (Jason Lowe via jeagles)
Date Tue, 16 Jan 2018 17:23:11 GMT
Repository: tez
Updated Branches:
  refs/heads/master d777f455b -> f7feaa72b


TEZ-3877. Delete unordered spill files once merge is done (Jason Lowe via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f7feaa72
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f7feaa72
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f7feaa72

Branch: refs/heads/master
Commit: f7feaa72b4fc42676b54e9581165439e9c6d3df7
Parents: d777f45
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Tue Jan 16 11:22:10 2018 -0600
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Tue Jan 16 11:22:10 2018 -0600

----------------------------------------------------------------------
 .../writers/UnorderedPartitionedKVWriter.java   | 21 +++++++++++++++++---
 .../TestUnorderedPartitionedKVWriter.java       | 17 ++++++++++++----
 2 files changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f7feaa72/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 6ea0385..f4ebc97 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -111,8 +111,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   WrappedBuffer currentBuffer;
   private final FileSystem rfs;
 
-  private final List<SpillInfo> spillInfoList = Collections
-      .synchronizedList(new ArrayList<SpillInfo>());
+  @VisibleForTesting
+  final List<SpillInfo> spillInfoList = Collections.synchronizedList(new ArrayList<SpillInfo>());
 
   private final ListeningExecutorService spillExecutor;
 
@@ -1039,12 +1039,26 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       if (out != null) {
         out.close();
       }
+      deleteIntermediateSpills();
     }
     finalSpillRecord.writeToFile(finalIndexPath, conf);
     fileOutputBytesCounter.increment(indexFileSizeEstimate);
     LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get()
+ " spills");
   }
 
+  private void deleteIntermediateSpills() {
+    // Delete the intermediate spill files
+    synchronized (spillInfoList) {
+      for (SpillInfo spill : spillInfoList) {
+        try {
+          rfs.delete(spill.outPath, false);
+        } catch (IOException e) {
+          LOG.warn("Unable to delete intermediate spill " + spill.outPath, e);
+        }
+      }
+    }
+  }
+
   private void writeLargeRecord(final Object key, final Object value, final int partition)
       throws IOException {
     numAdditionalSpillsCounter.increment(1);
@@ -1359,7 +1373,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
   }
 
-  private static class SpillInfo {
+  @VisibleForTesting
+  static class SpillInfo {
     final TezSpillRecord spillRecord;
     final Path outPath;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f7feaa72/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index f1cea7e..ae396cb 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -54,6 +54,7 @@ import com.google.protobuf.ByteString;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter.SpillInfo;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
 import org.roaringbitmap.RoaringBitmap;
@@ -1238,13 +1239,21 @@ public class TestUnorderedPartitionedKVWriter {
     Path outputFilePath = kvWriter.finalOutPath;
     Path spillFilePath = kvWriter.finalIndexPath;
 
-    if (numRecordsWritten > 0) {
-      assertTrue(localFs.exists(outputFilePath));
-      assertTrue(localFs.exists(spillFilePath));
-    } else {
+    if (numRecordsWritten <= 0) {
       return;
     }
 
+    assertTrue(localFs.exists(outputFilePath));
+    assertTrue(localFs.exists(spillFilePath));
+
+    // verify no intermediate spill files have been left around
+    synchronized (kvWriter.spillInfoList) {
+      for (SpillInfo spill : kvWriter.spillInfoList) {
+        assertFalse("lingering intermediate spill file " + spill.outPath,
+            localFs.exists(spill.outPath));
+      }
+    }
+
     // Special case for 0 records.
     TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
     DataInputBuffer keyBuffer = new DataInputBuffer();


Mime
View raw message