spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-7339] [PYSPARK] PySpark shuffle spill memory sometimes are not correct
Date Tue, 26 May 2015 15:36:19 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 79bb7dcec -> 25b2f95fe


[SPARK-7339] [PYSPARK] PySpark shuffle spill memory sometimes are not correct

In PySpark we get memory used before and after spill, then use the difference of these two
value as memorySpilled, but if the before value is small than after value, then we will get
a negative value, but this scenario 0 value may be more reasonable.

Below is the result in HistoryServer we have tested:
Index	ID	Attempt	Status	Locality Level	Executor ID / Host	Launch Time	Duration	GC Time	Input
Size / Records	Write Time	Shuffle Write Size / Records	Shuffle Spill (Memory)	Shuffle Spill
(Disk)	Errors
0	0	0	SUCCESS	NODE_LOCAL	3 / vm119	2015/05/04 17:31:06	21 s	0.1 s	128.1 MB (hadoop) / 3237
70 ms	10.1 MB / 2529	0.0 B	5.7 MB
2	2	0	SUCCESS	NODE_LOCAL	1 / vm118	2015/05/04 17:31:06	22 s	89 ms	128.1 MB (hadoop) / 3205
0.1 s	10.1 MB / 2529	-1048576.0 B	5.9 MB
1	1	0	SUCCESS	NODE_LOCAL	2 / vm117	2015/05/04 17:31:06	22 s	0.1 s	128.1 MB (hadoop) / 3271
68 ms	10.1 MB / 2529	-1048576.0 B	5.6 MB
4	4	0	SUCCESS	NODE_LOCAL	2 / vm117	2015/05/04 17:31:06	22 s	0.1 s	128.1 MB (hadoop) / 3192
51 ms	10.1 MB / 2529	-1048576.0 B	5.9 MB
3	3	0	SUCCESS	NODE_LOCAL	3 / vm119	2015/05/04 17:31:06	22 s	0.1 s	128.1 MB (hadoop) / 3262
51 ms	10.1 MB / 2529	1024.0 KB	5.8 MB
5	5	0	SUCCESS	NODE_LOCAL	1 / vm118	2015/05/04 17:31:06	22 s	89 ms	128.1 MB (hadoop) / 3256
93 ms	10.1 MB / 2529	-1048576.0 B	5.7 MB

/cc davies

Author: linweizhong <linweizhong@huawei.com>

Closes #5887 from Sephiroth-Lin/spark-7339 and squashes the following commits:

9186c81 [linweizhong] Use max function to get a nonnegative value
d41672b [linweizhong] Update MemoryBytesSpilled when memorySpilled > 0

(cherry picked from commit 8948ad3fb5d5d095d3942855960d735f27d97dd5)
Signed-off-by: Davies Liu <davies@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25b2f95f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25b2f95f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25b2f95f

Branch: refs/heads/branch-1.4
Commit: 25b2f95fe3690d1393fe138b80a53adc798a20fd
Parents: 79bb7dc
Author: linweizhong <linweizhong@huawei.com>
Authored: Tue May 26 08:35:39 2015 -0700
Committer: Davies Liu <davies@databricks.com>
Committed: Tue May 26 08:36:08 2015 -0700

----------------------------------------------------------------------
 python/pyspark/shuffle.py | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/25b2f95f/python/pyspark/shuffle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 1d0b16c..81c420c 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -362,7 +362,7 @@ class ExternalMerger(Merger):
 
         self.spills += 1
         gc.collect()  # release the memory as much as possible
-        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+        MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
 
     def items(self):
         """ Return all merged items as iterator """
@@ -515,7 +515,7 @@ class ExternalSorter(object):
                 gc.collect()
                 batch //= 2
                 limit = self._next_limit()
-                MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+                MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
                 DiskBytesSpilled += os.path.getsize(path)
                 os.unlink(path)  # data will be deleted after close
 
@@ -630,7 +630,7 @@ class ExternalList(object):
         self.values = []
         gc.collect()
         DiskBytesSpilled += self._file.tell() - pos
-        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+        MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
 
 
 class ExternalListOfList(ExternalList):
@@ -794,7 +794,7 @@ class ExternalGroupBy(ExternalMerger):
 
         self.spills += 1
         gc.collect()  # release the memory as much as possible
-        MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
+        MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
 
     def _merged_items(self, index):
         size = sum(os.path.getsize(os.path.join(self._get_spill_dir(j), str(index)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message