spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [1/7] git commit: Replace magic lengths with constants in PySpark.
Date Wed, 27 Nov 2013 04:55:58 GMT
Updated Branches:
  refs/heads/master 330ada176 -> fb6875dd5


Replace magic lengths with constants in PySpark.

Write the length of the accumulators section up-front rather
than terminating it with a negative length.  I find this
easier to read.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a48d88d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a48d88d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a48d88d2

Branch: refs/heads/master
Commit: a48d88d206fae348720ab077a624b3c57293374f
Parents: 41ead7a
Author: Josh Rosen <joshrosen@apache.org>
Authored: Sat Nov 2 21:13:18 2013 -0700
Committer: Josh Rosen <joshrosen@apache.org>
Committed: Sun Nov 3 10:54:24 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala | 26 ++++++++++++--------
 python/pyspark/serializers.py                   |  6 +++++
 python/pyspark/worker.py                        | 13 +++++-----
 3 files changed, 29 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a48d88d2/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 12b4d94..0d5913e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -132,7 +132,7 @@ private[spark] class PythonRDD[T: ClassManifest](
               val obj = new Array[Byte](length)
               stream.readFully(obj)
               obj
-            case -3 =>
+            case SpecialLengths.TIMING_DATA =>
               // Timing data from worker
               val bootTime = stream.readLong()
               val initTime = stream.readLong()
@@ -143,24 +143,24 @@ private[spark] class PythonRDD[T: ClassManifest](
               val total = finishTime - startTime
               logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total,
boot, init, finish))
               read
-            case -2 =>
+            case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
               // Signals that an exception has been thrown in python
               val exLength = stream.readInt()
               val obj = new Array[Byte](exLength)
               stream.readFully(obj)
               throw new PythonException(new String(obj))
-            case -1 =>
+            case SpecialLengths.END_OF_DATA_SECTION =>
               // We've finished the data section of the output, but we can still
-              // read some accumulator updates; let's do that, breaking when we
-              // get a negative length record.
-              var len2 = stream.readInt()
-              while (len2 >= 0) {
-                val update = new Array[Byte](len2)
+              // read some accumulator updates:
+              val numAccumulatorUpdates = stream.readInt()
+              (1 to numAccumulatorUpdates).foreach { _ =>
+                val updateLen = stream.readInt()
+                val update = new Array[Byte](updateLen)
                 stream.readFully(update)
                 accumulator += Collections.singletonList(update)
-                len2 = stream.readInt()
+
               }
-              new Array[Byte](0)
+              Array.empty[Byte]
           }
         } catch {
           case eof: EOFException => {
@@ -197,6 +197,12 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
   val asJavaPairRDD : JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this)
 }
 
+private object SpecialLengths {
+  val END_OF_DATA_SECTION = -1
+  val PYTHON_EXCEPTION_THROWN = -2
+  val TIMING_DATA = -3
+}
+
 private[spark] object PythonRDD {
 
   /** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a48d88d2/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 54fed1c..fbc280f 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -19,6 +19,12 @@ import struct
 import cPickle
 
 
+class SpecialLengths(object):
+    END_OF_DATA_SECTION = -1
+    PYTHON_EXCEPTION_THROWN = -2
+    TIMING_DATA = -3
+
+
 class Batch(object):
     """
     Used to store multiple RDD entries as a single Java object.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a48d88d2/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index d63c2aa..7696df9 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -31,7 +31,8 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
 from pyspark.cloudpickle import CloudPickler
 from pyspark.files import SparkFiles
 from pyspark.serializers import write_with_length, read_with_length, write_int, \
-    read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
+    read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file \
+    SpecialLengths
 
 
 def load_obj(infile):
@@ -39,7 +40,7 @@ def load_obj(infile):
 
 
 def report_times(outfile, boot, init, finish):
-    write_int(-3, outfile)
+    write_int(SpecialLengths.TIMING_DATA, outfile)
     write_long(1000 * boot, outfile)
     write_long(1000 * init, outfile)
     write_long(1000 * finish, outfile)
@@ -82,16 +83,16 @@ def main(infile, outfile):
         for obj in func(split_index, iterator):
             write_with_length(dumps(obj), outfile)
     except Exception as e:
-        write_int(-2, outfile)
+        write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
         write_with_length(traceback.format_exc(), outfile)
         sys.exit(-1)
     finish_time = time.time()
     report_times(outfile, boot_time, init_time, finish_time)
     # Mark the beginning of the accumulators section of the output
-    write_int(-1, outfile)
-    for aid, accum in _accumulatorRegistry.items():
+    write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
+    write_int(len(_accumulatorRegistry), outfile)
+    for (aid, accum) in _accumulatorRegistry.items():
         write_with_length(dump_pickle((aid, accum._value)), outfile)
-    write_int(-1, outfile)
 
 
 if __name__ == '__main__':


Mime
View raw message