Return-Path: X-Original-To: apmail-mahout-commits-archive@www.apache.org Delivered-To: apmail-mahout-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E06A1193AE for ; Mon, 11 Apr 2016 08:09:30 +0000 (UTC) Received: (qmail 11615 invoked by uid 500); 11 Apr 2016 08:09:29 -0000 Delivered-To: apmail-mahout-commits-archive@mahout.apache.org Received: (qmail 11436 invoked by uid 500); 11 Apr 2016 08:09:29 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 11154 invoked by uid 99); 11 Apr 2016 08:09:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Apr 2016 08:09:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D6A9DFC6E; Mon, 11 Apr 2016 08:09:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: smarthi@apache.org To: commits@mahout.apache.org Date: Mon, 11 Apr 2016 08:09:35 -0000 Message-Id: <109e5ddfa8af4f3baed634f749ce3ced@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [08/50] [abbrv] mahout git commit: Persist only if the dataset has not been cached. Otherwise read back in already cached dataset Persist only if the dataset has not been cached. Otherwise read back in already cached dataset Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9c5ee592 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9c5ee592 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9c5ee592 Branch: refs/heads/master Commit: 9c5ee59214a454f7ae25c762bf04bb30bd7982c8 Parents: a1cf7cf Author: Andrew Palumbo Authored: Fri Mar 25 18:10:18 2016 -0400 Committer: Andrew Palumbo Committed: Fri Mar 25 18:10:18 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 4 +-- .../drm/CheckpointedFlinkDrm.scala | 32 +++++++++++++++----- 2 files changed, 26 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/9c5ee592/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index af508b3..0640ebe 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -359,9 +359,9 @@ object FlinkEngine extends DistributedEngine { } def generateTypeInformation[K: ClassTag]: TypeInformation[K] = { - implicit val tag = ClassTag[K] + implicit val ktag = classTag[K] - generateTypeInformationFromTag(tag) + generateTypeInformationFromTag(ktag) } private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = { http://git-wip-us.apache.org/repos/asf/mahout/blob/9c5ee592/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index ea96e88..65acbd6 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -77,14 +77,17 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], override val keyClassTag: ClassTag[K] = classTag[K] def cache() = { + implicit val typeInformation = createTypeInformation[(K,Vector)] + implicit val inputFormat = (ds.getType) if (!isCached) { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) } - implicit val typeInformation = createTypeInformation[(K,Vector)] - val _ds = persist(ds, persistanceRootDir + cacheFileName) + val _ds = readPersistedDataSet(cacheFileName, ds) + datasetWrap(_ds) } @@ -99,12 +102,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], * @param dataset [[DataSet]] to write to disk * @param path File path to write dataset to * @tparam T Type of the [[DataSet]] elements - * @return [[DataSet]] reading the just written file */ - def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = { + def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): Unit = { val env = dataset.getExecutionEnvironment val outputFormat = new TypeSerializerOutputFormat[T] - val filePath = new Path(path) outputFormat.setOutputFilePath(filePath) @@ -112,14 +113,29 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], dataset.output(outputFormat) env.execute("FlinkTools persist") + } + + /** Read a [[DataSet]] from specified path and returns it as a DataSource for subsequent + * operations. + * + * @param path File path to read dataset from + * @param ds persisted ds to retrieve type information and environment forom + * @tparam T key Type of the [[DataSet]] elements + * @return [[DataSet]] reading the just written file + */ + def readPersistedDataSet[T: ClassTag : TypeInformation] + (path: String, ds: DataSet[T]): DataSet[T] = { - val inputFormat = new TypeSerializerInputFormat[T](dataset.getType) + val env = ds.getExecutionEnvironment + val inputFormat = new TypeSerializerInputFormat[T](ds.getType()) + val filePath = new Path(path) inputFormat.setFilePath(filePath) env.createInput(inputFormat) } - // Members declared in org.apache.mahout.math.drm.DrmLike + + // Members declared in org.apache.mahout.math.drm.DrmLike protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows