spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-5154] [PySpark] [Streaming] Kafka strea...
Date Fri, 30 Jan 2015 00:57:17 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3715#discussion_r23817104
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -371,54 +372,28 @@ private[spark] object PythonRDD extends Logging {
       }
     
       def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
    -    // The right way to implement this would be to use TypeTags to get the full
    -    // type of T.  Since I don't want to introduce breaking changes throughout the
    -    // entire Spark API, I have to use this hacky approach:
    -    if (iter.hasNext) {
    -      val first = iter.next()
    -      val newIter = Seq(first).iterator ++ iter
    -      first match {
    -        case arr: Array[Byte] =>
    -          newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes =>
    -            dataOut.writeInt(bytes.length)
    -            dataOut.write(bytes)
    -          }
    -        case string: String =>
    -          newIter.asInstanceOf[Iterator[String]].foreach { str =>
    -            writeUTF(str, dataOut)
    -          }
    -        case stream: PortableDataStream =>
    -          newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
    -            val bytes = stream.toArray()
    -            dataOut.writeInt(bytes.length)
    -            dataOut.write(bytes)
    -          }
    -        case (key: String, stream: PortableDataStream) =>
    -          newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
    -            case (key, stream) =>
    -              writeUTF(key, dataOut)
    -              val bytes = stream.toArray()
    -              dataOut.writeInt(bytes.length)
    -              dataOut.write(bytes)
    -          }
    -        case (key: String, value: String) =>
    -          newIter.asInstanceOf[Iterator[(String, String)]].foreach {
    -            case (key, value) =>
    -              writeUTF(key, dataOut)
    -              writeUTF(value, dataOut)
    -          }
    -        case (key: Array[Byte], value: Array[Byte]) =>
    -          newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
    -            case (key, value) =>
    -              dataOut.writeInt(key.length)
    -              dataOut.write(key)
    -              dataOut.writeInt(value.length)
    -              dataOut.write(value)
    -          }
    -        case other =>
    -          throw new SparkException("Unexpected element type " + first.getClass)
    -      }
    +
    +    def write(obj: Any): Unit = obj match {
    --- End diff --
    
    nit: this method can be made a final method of scala and use tail-recursion optimization.
    http://tech.pro/blog/2112/scala-tail-recursion-optimisation-and-comparison-to-java
    might help in performance of large nested objects. this is can be a different PR completely.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message