mnemonic-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject incubator-mnemonic git commit: MNEMONIC-258: Implement the functionalities of DurableRDD's direct IO
Date Tue, 09 May 2017 19:48:58 GMT
Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master a7b94583c -> 636532e11


MNEMONIC-258: Implement the functionalities of DurableRDD's direct IO


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/636532e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/636532e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/636532e1

Branch: refs/heads/master
Commit: 636532e117295d2bd36cf74a473326ae06b60347
Parents: a7b9458
Author: Wang, Gang(Gary) <gang1.wang@intel.com>
Authored: Mon May 8 16:53:53 2017 -0700
Committer: Wang, Gang(Gary) <gang1.wang@intel.com>
Committed: Tue May 9 12:45:48 2017 -0700

----------------------------------------------------------------------
 .../mnemonic/spark/DurableSparkFunctions.scala  |  41 ++++++
 .../apache/mnemonic/spark/rdd/DurableRDD.scala  | 128 +++++++++++++------
 .../spark/rdd/DurableRDDFunctions.scala         |  13 +-
 .../spark/rdd/DurableSparkFunctions.scala       |  42 ------
 4 files changed, 141 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/636532e1/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableSparkFunctions.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableSparkFunctions.scala
b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableSparkFunctions.scala
new file mode 100644
index 0000000..cd50cb6
--- /dev/null
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/DurableSparkFunctions.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.spark
+
+import org.apache.spark._
+import scala.reflect.ClassTag
+import scala.language.implicitConversions
+import org.apache.mnemonic.DurableType
+import org.apache.mnemonic.EntityFactoryProxy
+import org.apache.mnemonic.spark.rdd.DurableRDD
+
+class DurableSparkFunctions(sc: SparkContext) extends Serializable {
+
+  def mnemonic[D: ClassTag] (path: String,
+                     serviceName: String,
+                     durableTypes: Array[DurableType],
+                     entityFactoryProxies: Array[EntityFactoryProxy],
+                     slotKeyId: Long) = {
+    DurableRDD[D](sc, path: String,
+      serviceName, durableTypes, entityFactoryProxies, slotKeyId)
+  }
+}
+
+object DurableSparkFunctions {
+  implicit def addDurableFunctions(sc: SparkContext) = new DurableSparkFunctions(sc)
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/636532e1/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
index a6a15a7..5386abf 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDD.scala
@@ -22,17 +22,18 @@ import java.io.File
 import org.apache.spark.rdd.RDD
 import org.apache.spark._
 import org.apache.commons.io.FileUtils
-import scala.reflect.{ ClassTag }
+
+import scala.reflect.ClassTag
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
-import org.apache.mnemonic.DurableType
-import org.apache.mnemonic.EntityFactoryProxy
-import org.apache.mnemonic.NonVolatileMemAllocator
+import org.apache.mnemonic.{ConfigurationException, DurableType, EntityFactoryProxy, NonVolatileMemAllocator}
 import org.apache.mnemonic.sessions.ObjectCreator
 import org.apache.mnemonic.spark.MneDurableInputSession
 import org.apache.mnemonic.spark.MneDurableOutputSession
 import org.apache.mnemonic.spark.DurableException
 
+import scala.collection.mutable
+
 private[spark] class DurableRDD[D: ClassTag, T: ClassTag] (
   @transient private var _sc: SparkContext,
   @transient private var deps: Seq[Dependency[_]],
@@ -45,40 +46,47 @@ private[spark] class DurableRDD[D: ClassTag, T: ClassTag] (
 
   private val isInputOnly =  null == deps
 
-  private val durdddir = DurableRDD.getRddDirName(durableDirectory, id)
-  DurableRDD.resetRddDir(durdddir)
-
-  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
+  private var durdddir:String = _
+  if (isInputOnly) {
+    val dir = new File(durableDirectory)
+    if (!dir.exists) {
+      throw new ConfigurationException("Input directory does not exist")
+    }
+    durdddir = durableDirectory
+  } else {
+    durdddir = DurableRDD.getRddDirName(durableDirectory, id)
+    DurableRDD.resetRddDir(durdddir)
+  }
 
-  override protected def getPartitions: Array[Partition] = firstParent[T].partitions
+  override val partitioner = if (!isInputOnly && preservesPartitioning) firstParent[T].partitioner
else None
 
-  def prepareDurablePartition(split: Partition, context: TaskContext,
-      iterator: Iterator[T]): Array[File] = {
-    val outsess = MneDurableOutputSession[D](serviceName,
-        durableTypes, entityFactoryProxies, slotKeyId,
-        partitionPoolSize, durdddir.toString,
-        DurableRDD.genDurableFileName(split.hashCode)_)
-    try {
-      for (item <- iterator) {
-        f(item, outsess) match {
-          case Some(res) => outsess.post(res)
-          case None =>
-        }
+  override protected def getPartitions: Array[Partition] = {
+    if (isInputOnly) {
+      val ret = DurableRDD.collectMemPoolPartitionList(durdddir).getOrElse(Array[Partition]())
+      if (ret.isEmpty) {
+        logInfo(s"Not found any partitions in the directory ${durdddir}")
       }
-    } finally {
-      outsess.close()
+      ret
+    } else {
+      firstParent[T].partitions
     }
-    outsess.memPools.toArray
   }
 
   override def compute(split: Partition, context: TaskContext): Iterator[D] = {
     val mempListOpt: Option[Array[File]] =
-      DurableRDD.collectMemPoolFileList(durdddir.toString, DurableRDD.genDurableFileName(split.hashCode)_)
+      DurableRDD.collectMemPoolFileList(durdddir, DurableRDD.genDurableFileName(context.partitionId)_)
     val memplist = mempListOpt match {
       case None => {
-        val mplst = prepareDurablePartition(split, context, firstParent[T].iterator(split,
context))
-        logInfo(s"Done transformed RDD #${firstParent[T].id} to durableRDD #${id} on ${durdddir.toString}")
-        mplst
+        if (isInputOnly) {
+          logInfo(s"Not found any mem pool files related to the partition #${context.partitionId}")
+          Array[File]()
+        } else {
+          val mplst = DurableRDD.prepareDurablePartition[D, T](durdddir,
+            serviceName, durableTypes, entityFactoryProxies, slotKeyId,
+            partitionPoolSize, f)(context, firstParent[T].iterator(split, context))
+          logInfo(s"Done transformed RDD #${firstParent[T].id} to durableRDD #${id} on ${durdddir}")
+          mplst
+        }
       }
       case Some(mplst) => mplst
     }
@@ -104,6 +112,7 @@ object DurableRDD {
 
   val durableSubDirNameTemplate = "durable-rdd-%010d"
   val durableFileNameTemplate = "mem_%010d_%010d.mne"
+  val durableFileNamePartitionRegex = raw"mem_(\d{10})_0000000000.mne".r
 
   private var durableDir: Option[String] = None
 
@@ -146,23 +155,40 @@ object DurableRDD {
   }
 
   def createRddDir(rddDirName: String) {
-    val durdddir = new File(rddDirName)
-    if (!durdddir.mkdir) {
-      throw new DurableException(s"Durable RDD directory ${durdddir.toString} cannot be created")
+    val dir = new File(rddDirName)
+    if (!dir.mkdir) {
+      throw new DurableException(s"Durable RDD directory ${dir.toString} cannot be created")
     }
   }
 
   def deleteRddDir(rddDirName: String) {
-    val durdddir = new File(rddDirName)
-    if (durdddir.exists) {
-      FileUtils.deleteDirectory(durdddir)
+    val dir = new File(rddDirName)
+    if (dir.exists) {
+      FileUtils.deleteDirectory(dir)
     }
   }
 
-  def genDurableFileName(splitId: Int)(mempidx: Long): String = {
+  def genDurableFileName(splitId: Long)(mempidx: Long): String = {
     durableFileNameTemplate.format(splitId, mempidx)
   }
 
+  def collectMemPoolPartitionList(path: String): Option[Array[Partition]] = {
+    val paridset = new mutable.TreeSet[Int]
+    val dir = new File(path)
+    if (dir.isDirectory) {
+      val flst = dir.listFiles.filter(_.isDirectory)
+      for (file <- flst) {
+        file.toString match {
+          case durableFileNamePartitionRegex(paridx) => {
+            paridset += paridx.toInt
+          }
+          case _ =>
+        }
+      }
+    }
+    Option(paridset.toArray.map(x => new Partition { val index = x }))
+  }
+
   def collectMemPoolFileList(durddir: String, memFileNameGen: (Long)=>String): Option[Array[File]]
= {
     val flist: ArrayBuffer[File] = new ArrayBuffer[File]
     var idx: Long = 0L
@@ -184,6 +210,29 @@ object DurableRDD {
     }
   }
 
+  def prepareDurablePartition[D: ClassTag, T: ClassTag] (path: String,
+                              serviceName: String, durableTypes: Array[DurableType],
+                              entityFactoryProxies: Array[EntityFactoryProxy], slotKeyId:
Long,
+                              partitionPoolSize: Long,
+                              func: (T, ObjectCreator[D, NonVolatileMemAllocator]) =>
Option[D]
+                             ) (context: TaskContext, iterator: Iterator[T]): Array[File]
= {
+    val outsess = MneDurableOutputSession[D](serviceName,
+      durableTypes, entityFactoryProxies, slotKeyId,
+      partitionPoolSize, path,
+      genDurableFileName(context.partitionId)_)
+    try {
+      for (item <- iterator) {
+        func(item, outsess) match {
+          case Some(res) => outsess.post(res)
+          case None =>
+        }
+      }
+    } finally {
+      outsess.close()
+    }
+    outsess.memPools.toArray
+  }
+
   def apply[D: ClassTag, T: ClassTag] (
       rdd: RDD[T],
       serviceName: String, durableTypes: Array[DurableType],
@@ -191,21 +240,22 @@ object DurableRDD {
       partitionPoolSize: Long,
       f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D],
       preservesPartitioning: Boolean = false) = {
-//    val sc: SparkContext = rdd.context
+    val sc: SparkContext = rdd.context
+    val cleanF = f // sc.clean(f)
     val ret = new DurableRDD[D, T](rdd.context , List(new OneToOneDependency(rdd)),
       serviceName, durableTypes, entityFactoryProxies, slotKeyId,
-      partitionPoolSize, getDurableDir(rdd.context).get, f, preservesPartitioning)
+      partitionPoolSize, getDurableDir(sc).get, cleanF, preservesPartitioning)
     //sc.cleaner.foreach(_.registerRDDForCleanup(ret))
     ret
   }
 
   def apply[D: ClassTag] (
-      sc: SparkContext, pathname: String,
+      sc: SparkContext, path: String,
       serviceName: String, durableTypes: Array[DurableType],
       entityFactoryProxies: Array[EntityFactoryProxy], slotKeyId: Long) = {
     val ret = new DurableRDD[D, Unit](sc, null,
       serviceName, durableTypes, entityFactoryProxies, slotKeyId,
-      1024*1024*1024L, pathname, null)
+      1024*1024*1024L, path, null)
     ret
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/636532e1/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
index 7915d5b..587b957 100644
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
+++ b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableRDDFunctions.scala
@@ -17,6 +17,7 @@
 
 package org.apache.mnemonic.spark.rdd
 
+import java.io.File
 import org.apache.spark.rdd.RDD
 import scala.reflect.ClassTag
 import scala.language.implicitConversions
@@ -40,14 +41,22 @@ class DurableRDDFunctions[T: ClassTag](rdd: RDD[T]) extends Serializable
{
       partitionPoolSize, f, preservesPartitioning)
   }
 
-  def saveAsMnemonic[D: ClassTag] (dir: String,
+  def saveAsMnemonic[D: ClassTag] (path: String,
                      serviceName: String,
                      durableTypes: Array[DurableType],
                      entityFactoryProxies: Array[EntityFactoryProxy],
                      slotKeyId: Long,
                      partitionPoolSize: Long,
                      f: (T, ObjectCreator[D, NonVolatileMemAllocator]) => Option[D]) {
-    //TODO: implement export operationl
+    val dir = new File(path)
+    if (!dir.exists) {
+      dir.mkdir
+    }
+    val cleanF = f // rdd.context.clean(f)
+    val func = DurableRDD.prepareDurablePartition[D, T] (path,
+      serviceName, durableTypes, entityFactoryProxies, slotKeyId,
+      partitionPoolSize, cleanF)_
+    rdd.context.runJob(rdd, func)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/636532e1/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableSparkFunctions.scala
----------------------------------------------------------------------
diff --git a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableSparkFunctions.scala
b/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableSparkFunctions.scala
deleted file mode 100644
index 54d9f27..0000000
--- a/mnemonic-spark/mnemonic-spark-core/src/main/scala/org/apache/mnemonic/spark/rdd/DurableSparkFunctions.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mnemonic.spark
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark._
-import scala.reflect.ClassTag
-import scala.language.implicitConversions
-import org.apache.mnemonic.DurableType
-import org.apache.mnemonic.EntityFactoryProxy
-import org.apache.mnemonic.spark.rdd.DurableRDD
-
-class DurableSparkFunctions(sc: SparkContext) extends Serializable {
-
-  def mnemonic[D: ClassTag] (pathname: String,
-                     serviceName: String,
-                     durableTypes: Array[DurableType],
-                     entityFactoryProxies: Array[EntityFactoryProxy],
-                     slotKeyId: Long) = {
-    DurableRDD[D](sc, pathname: String,
-      serviceName, durableTypes, entityFactoryProxies, slotKeyId)
-  }
-}
-
-object DurableSparkFunctions {
-  implicit def addDurableFunctions(sc: SparkContext) = new DurableSparkFunctions(sc)
-}


Mime
View raw message