spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [1/2] git commit: adding clone records field to equivaled java apis
Date Fri, 17 Jan 2014 07:17:38 GMT
Updated Branches:
  refs/heads/master c06a307ca -> d4fd89e3c


adding clone records field to equivaled java apis


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fcb4fc65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fcb4fc65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fcb4fc65

Branch: refs/heads/master
Commit: fcb4fc653d1a66ff393dde335551ae3059f112c9
Parents: b93f9d4
Author: Prashant Sharma <scrapcodes@gmail.com>
Authored: Tue Jan 14 20:13:55 2014 +0530
Committer: Prashant Sharma <scrapcodes@gmail.com>
Committed: Fri Jan 17 11:16:03 2014 +0530

----------------------------------------------------------------------
 .../spark/api/java/JavaSparkContext.scala       | 116 ++++++++++++++++++-
 1 file changed, 114 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcb4fc65/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 7a6f044..0dd0a11 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -137,7 +137,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    */
   def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
 
-  /**Get an RDD for a Hadoop SequenceFile with given key and value types. */
+  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
   def sequenceFile[K, V](path: String,
     keyClass: Class[K],
     valueClass: Class[V],
@@ -148,7 +148,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
   }
 
-  /**Get an RDD for a Hadoop SequenceFile. */
+  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
+  def sequenceFile[K, V](path: String,
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int,
+    cloneRecords: Boolean
+    ): JavaPairRDD[K, V] = {
+    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits, cloneRecords))
+  }
+
+  /** Get an RDD for a Hadoop SequenceFile. */
   def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
   JavaPairRDD[K, V] = {
     implicit val kcm: ClassTag[K] = ClassTag(keyClass)
@@ -156,6 +168,15 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
   }
 
+  /** Get an RDD for a Hadoop SequenceFile. */
+  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], 
+    cloneRecords: Boolean):
+  JavaPairRDD[K, V] = {
+    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, cloneRecords))
+  }
+
   /**
    * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable
keys and
    * BytesWritable values that contain a serialized partition. This is still an experimental
storage
@@ -197,6 +218,37 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
   }
 
+
+  /**
+   * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat
and other
+   * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
+   * using the older MapReduce API (`org.apache.hadoop.mapred`).
+   *
+   * @param conf JobConf for setting up the dataset
+   * @param inputFormatClass Class of the [[InputFormat]]
+   * @param keyClass Class of the keys
+   * @param valueClass Class of the values
+   * @param minSplits Minimum number of Hadoop Splits to generate.
+   * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
+   *                     Most RecordReader implementations reuse wrapper objects across multiple
+   *                     records, and can cause problems in RDD collect or aggregation operations.
+   *                     By default the records are cloned in Spark. However, application
+   *                     programmers can explicitly disable the cloning for better performance.
+   */
+  def hadoopRDD[K, V, F <: InputFormat[K, V]](
+    conf: JobConf,
+    inputFormatClass: Class[F],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int,
+    cloneRecords: Boolean
+    ): JavaPairRDD[K, V] = {
+    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits,
+      cloneRecords))
+  }
+
   /**
    * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat
and any
    * other necessary info (e.g. file name for a filesystem-based dataset, table name for
HyperTable,
@@ -231,6 +283,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     path: String,
     inputFormatClass: Class[F],
     keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int,
+    cloneRecords: Boolean
+    ): JavaPairRDD[K, V] = {
+    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, 
+      minSplits, cloneRecords))
+  }
+
+  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+  def hadoopFile[K, V, F <: InputFormat[K, V]](
+    path: String,
+    inputFormatClass: Class[F],
+    keyClass: Class[K],
     valueClass: Class[V]
     ): JavaPairRDD[K, V] = {
     implicit val kcm: ClassTag[K] = ClassTag(keyClass)
@@ -239,6 +306,20 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
       inputFormatClass, keyClass, valueClass))
   }
 
+  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+  def hadoopFile[K, V, F <: InputFormat[K, V]](
+    path: String,
+    inputFormatClass: Class[F],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    cloneRecords: Boolean
+    ): JavaPairRDD[K, V] = {
+    implicit val kcm: ClassTag[K] = ClassTag(keyClass)
+    implicit val vcm: ClassTag[V] = ClassTag(valueClass)
+    new JavaPairRDD(sc.hadoopFile(path,
+      inputFormatClass, keyClass, valueClass, cloneRecords = cloneRecords))
+  }
+
   /**
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
@@ -258,6 +339,22 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
    * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
    * and extra configuration options to pass to the input format.
    */
+  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
+    path: String,
+    fClass: Class[F],
+    kClass: Class[K],
+    vClass: Class[V],
+    conf: Configuration,
+    cloneRecords: Boolean): JavaPairRDD[K, V] = {
+    implicit val kcm: ClassTag[K] = ClassTag(kClass)
+    implicit val vcm: ClassTag[V] = ClassTag(vClass)
+    new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf, cloneRecords))
+  }
+
+  /**
+   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
+   * and extra configuration options to pass to the input format.
+   */
   def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
     conf: Configuration,
     fClass: Class[F],
@@ -268,6 +365,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
   }
 
+  /**
+   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
+   * and extra configuration options to pass to the input format.
+   */
+  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
+    conf: Configuration,
+    fClass: Class[F],
+    kClass: Class[K],
+    vClass: Class[V],
+    cloneRecords: Boolean): JavaPairRDD[K, V] = {
+    implicit val kcm: ClassTag[K] = ClassTag(kClass)
+    implicit val vcm: ClassTag[V] = ClassTag(vClass)
+    new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass, cloneRecords))
+  }
+
   /** Build the union of two or more RDDs. */
   override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T]
= {
     val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)


Mime
View raw message