Return-Path: X-Original-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 53C9CE93C for ; Sat, 26 Jan 2013 01:24:33 +0000 (UTC) Received: (qmail 9565 invoked by uid 500); 26 Jan 2013 01:24:33 -0000 Delivered-To: apmail-incubator-crunch-commits-archive@incubator.apache.org Received: (qmail 9525 invoked by uid 500); 26 Jan 2013 01:24:33 -0000 Mailing-List: contact crunch-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-commits@incubator.apache.org Received: (qmail 9517 invoked by uid 99); 26 Jan 2013 01:24:33 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Jan 2013 01:24:33 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EC39A825B66; Sat, 26 Jan 2013 01:24:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: crunch-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-144: Convert Targets to SourceTargets wherever possible to mark a PCollection as materialized, and update the rules on converting a text file target to SourceTargets. Message-Id: <20130126012432.EC39A825B66@tyr.zones.apache.org> Date: Sat, 26 Jan 2013 01:24:32 +0000 (UTC) Updated Branches: refs/heads/master 5cf8142ae -> 9a1c42760 CRUNCH-144: Convert Targets to SourceTargets wherever possible to mark a PCollection as materialized, and update the rules on converting a text file target to SourceTargets. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/9a1c4276 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/9a1c4276 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/9a1c4276 Branch: refs/heads/master Commit: 9a1c42760b5c01a5d1dbdf6ee6bdc4f9fb8ca086 Parents: 5cf8142 Author: Josh Wills Authored: Wed Jan 16 07:42:10 2013 -0800 Committer: Josh Wills Committed: Fri Jan 25 15:47:17 2013 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/impl/mr/MRPipeline.java | 14 ++++++-- .../org/apache/crunch/io/text/TextFileTarget.java | 29 +++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/9a1c4276/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index d9545f8..c71ef23 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -162,9 +162,17 @@ public class MRPipeline implements Pipeline { } else { boolean materialized = false; for (Target t : outputTargets.get(c)) { - if (!materialized && t instanceof Source) { - c.materializeAt((SourceTarget) t); - materialized = true; + if (!materialized) { + if (t instanceof SourceTarget) { + c.materializeAt((SourceTarget) t); + materialized = true; + } else { + SourceTarget st = t.asSourceTarget(c.getPType()); + if (st != null) { + c.materializeAt(st); + materialized = true; + } + } } } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/9a1c4276/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java index ec7d521..0c3e6a4 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.io.text; +import org.apache.avro.Schema; import org.apache.crunch.SourceTarget; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.SequentialFileNamingScheme; @@ -25,8 +26,12 @@ import org.apache.crunch.types.Converter; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.avro.AvroTextOutputFormat; +import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.writable.WritableType; +import org.apache.crunch.types.writable.WritableTypeFamily; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; @@ -72,9 +77,33 @@ public class TextFileTarget extends FileTargetImpl { @Override public SourceTarget asSourceTarget(PType ptype) { + if (!isTextCompatible(ptype)) { + return null; + } if (ptype instanceof PTableType) { return new TextFileTableSourceTarget(path, (PTableType) ptype); } return new TextFileSourceTarget(path, ptype); } + + private boolean isTextCompatible(PType ptype) { + if (AvroTypeFamily.getInstance().equals(ptype.getFamily())) { + AvroType at = (AvroType) ptype; + if (at.getSchema().equals(Schema.create(Schema.Type.STRING))) { + return true; + } + } else if (WritableTypeFamily.getInstance().equals(ptype.getFamily())) { + if (ptype instanceof PTableType) { + PTableType ptt = (PTableType) ptype; + return isText(ptt.getKeyType()) && isText(ptt.getValueType()); + } else { + return isText(ptype); + } + } + return false; + } + + private boolean isText(PType wtype) { + return Text.class.equals(((WritableType) wtype).getSerializationClass()); + } }