Return-Path: X-Original-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F0D99D55C for ; Sat, 7 Jul 2012 21:49:07 +0000 (UTC) Received: (qmail 33633 invoked by uid 500); 7 Jul 2012 21:49:07 -0000 Delivered-To: apmail-incubator-crunch-commits-archive@incubator.apache.org Received: (qmail 33564 invoked by uid 500); 7 Jul 2012 21:49:07 -0000 Mailing-List: contact crunch-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-commits@incubator.apache.org Received: (qmail 33354 invoked by uid 99); 7 Jul 2012 21:49:07 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 07 Jul 2012 21:49:07 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F0BC4C8C2; Sat, 7 Jul 2012 21:49:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: crunch-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [11/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core Message-Id: <20120707214906.F0BC4C8C2@tyr.zones.apache.org> Date: Sat, 7 Jul 2012 21:49:06 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java new file mode 100644 index 0000000..5780da8 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.util.List; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.conf.Configuration; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Source; +import org.apache.crunch.impl.mr.run.NodeContext; +import org.apache.crunch.impl.mr.run.RTNode; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PGroupedTableType; +import org.apache.crunch.types.PType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class DoNode { + + private static final List NO_CHILDREN = ImmutableList.of(); + + private final DoFn fn; + private final String name; + private final PType ptype; + private final List children; + private final Converter outputConverter; + private final Source source; + private String outputName; + + private DoNode(DoFn fn, String name, PType ptype, List children, + Converter outputConverter, Source source) { + this.fn = fn; + this.name = name; + this.ptype = ptype; + this.children = children; + this.outputConverter = outputConverter; + this.source = source; + } + + private static List allowsChildren() { + return Lists.newArrayList(); + } + + public static DoNode createGroupingNode(String name, + PGroupedTableType ptype) { + DoFn fn = ptype.getOutputMapFn(); + return new DoNode(fn, name, ptype, NO_CHILDREN, + ptype.getGroupingConverter(), null); + } + + public static DoNode createOutputNode(String name, PType ptype) { + Converter outputConverter = ptype.getConverter(); + DoFn fn = ptype.getOutputMapFn(); + return new DoNode(fn, name, ptype, NO_CHILDREN, + outputConverter, null); + } + + public static DoNode createFnNode(String name, DoFn function, + PType ptype) { + return new DoNode(function, name, ptype, allowsChildren(), null, null); + } + + public static DoNode createInputNode(Source source) { + PType ptype = source.getType(); + DoFn fn = ptype.getInputMapFn(); + return new DoNode(fn, source.toString(), ptype, allowsChildren(), null, + source); + } + + public boolean isInputNode() { + return source != null; + } + + public boolean isOutputNode() { + return outputConverter != null; + } + + public String getName() { + return name; + } + + public List getChildren() { + return children; + } + + public Source getSource() { + return source; + } + + public PType getPType() { + return ptype; + } + + public DoNode addChild(DoNode node) { + if (!children.contains(node)) { + this.children.add(node); + } + return this; + } + + public void setOutputName(String outputName) { + if (outputConverter == null) { + throw new IllegalStateException( + "Cannot set output name w/o output converter: " + outputName); + } + this.outputName = outputName; + } + + public RTNode toRTNode(boolean inputNode, Configuration conf, NodeContext nodeContext) { + List childRTNodes = Lists.newArrayList(); + fn.configure(conf); + for (DoNode child : children) { + childRTNodes.add(child.toRTNode(false, conf, nodeContext)); + } + + Converter inputConverter = null; + if (inputNode) { + if (nodeContext == NodeContext.MAP) { + inputConverter = ptype.getConverter(); + } else { + inputConverter = ((PGroupedTableType) ptype).getGroupingConverter(); + } + } + return new RTNode(fn, name, childRTNodes, inputConverter, outputConverter, + outputName); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof DoNode)) { + return false; + } + if (this == other) { + return true; + } + DoNode o = (DoNode) other; + return (name.equals(o.name) && fn.equals(o.fn) && source == o.source && + outputConverter == o.outputConverter); + } + + @Override + public int hashCode() { + HashCodeBuilder hcb = new HashCodeBuilder(); + return hcb.append(name).append(fn).append(source) + .append(outputConverter).toHashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java b/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java new file mode 100644 index 0000000..55f690a --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.util.List; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +/** + * Visitor that traverses the {@code DoNode} instances in a job and builds + * a String that identifies the stages of the pipeline that belong to + * this job. + */ +public class JobNameBuilder { + + private static final Joiner JOINER = Joiner.on("+"); + private static final Joiner CHILD_JOINER = Joiner.on("/"); + + private String pipelineName; + List rootStack = Lists.newArrayList(); + + public JobNameBuilder(final String pipelineName){ + this.pipelineName = pipelineName; + } + + public void visit(DoNode node) { + visit(node, rootStack); + } + + public void visit(List nodes) { + visit(nodes, rootStack); + } + + private void visit(List nodes, List stack) { + if (nodes.size() == 1) { + visit(nodes.get(0), stack); + } else { + List childStack = Lists.newArrayList(); + for (int i = 0; i < nodes.size(); i++) { + DoNode node = nodes.get(i); + List subStack = Lists.newArrayList(); + visit(node, subStack); + if (!subStack.isEmpty()) { + childStack.add("[" + JOINER.join(subStack) + "]"); + } + } + if (!childStack.isEmpty()) { + stack.add("[" + CHILD_JOINER.join(childStack) + "]"); + } + } + } + + private void visit(DoNode node, List stack) { + String name = node.getName(); + if (!name.isEmpty()) { + stack.add(node.getName()); + } + visit(node.getChildren(), stack); + } + + public String build() { + return String.format("%s: %s", pipelineName, JOINER.join(rootStack)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java new file mode 100644 index 0000000..165641a --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; + +import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mr.collect.DoTableImpl; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; +import org.apache.crunch.impl.mr.exec.CrunchJob; +import org.apache.crunch.impl.mr.run.CrunchCombiner; +import org.apache.crunch.impl.mr.run.CrunchInputFormat; +import org.apache.crunch.impl.mr.run.CrunchMapper; +import org.apache.crunch.impl.mr.run.CrunchReducer; +import org.apache.crunch.impl.mr.run.NodeContext; +import org.apache.crunch.impl.mr.run.RTNode; +import org.apache.crunch.util.DistCache; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class JobPrototype { + + public static JobPrototype createMapReduceJob(PGroupedTableImpl group, + Set inputs, Path workingPath) { + return new JobPrototype(inputs, group, workingPath); + } + + public static JobPrototype createMapOnlyJob( + HashMultimap mapNodePaths, Path workingPath) { + return new JobPrototype(mapNodePaths, workingPath); + } + + private final Set mapNodePaths; + private final PGroupedTableImpl group; + private final Set dependencies = Sets.newHashSet(); + private final Map, DoNode> nodes = Maps.newHashMap(); + private final Path workingPath; + + private HashMultimap targetsToNodePaths; + private DoTableImpl combineFnTable; + + private CrunchJob job; + + private JobPrototype(Set inputs, PGroupedTableImpl group, + Path workingPath) { + this.mapNodePaths = ImmutableSet.copyOf(inputs); + this.group = group; + this.workingPath = workingPath; + this.targetsToNodePaths = null; + } + + private JobPrototype(HashMultimap outputPaths, Path workingPath) { + this.group = null; + this.mapNodePaths = null; + this.workingPath = workingPath; + this.targetsToNodePaths = outputPaths; + } + + public void addReducePaths(HashMultimap outputPaths) { + if (group == null) { + throw new IllegalStateException( + "Cannot add a reduce phase to a map-only job"); + } + this.targetsToNodePaths = outputPaths; + } + + public void addDependency(JobPrototype dependency) { + this.dependencies.add(dependency); + } + + public CrunchJob getCrunchJob(Class jarClass, Configuration conf, Pipeline pipeline) throws IOException { + if (job == null) { + job = build(jarClass, conf, pipeline); + for (JobPrototype proto : dependencies) { + job.addDependingJob(proto.getCrunchJob(jarClass, conf, pipeline)); + } + } + return job; + } + + private CrunchJob build(Class jarClass, Configuration conf, Pipeline pipeline) throws IOException { + Job job = new Job(conf); + conf = job.getConfiguration(); + conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString()); + job.setJarByClass(jarClass); + + Set outputNodes = Sets.newHashSet(); + Set targets = targetsToNodePaths.keySet(); + Path outputPath = new Path(workingPath, "output"); + MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, + group == null); + for (Target target : targets) { + DoNode node = null; + for (NodePath nodePath : targetsToNodePaths.get(target)) { + if (node == null) { + PCollectionImpl collect = nodePath.tail(); + node = DoNode.createOutputNode(target.toString(), collect.getPType()); + outputHandler.configureNode(node, target); + } + outputNodes.add(walkPath(nodePath.descendingIterator(), node)); + } + } + + job.setMapperClass(CrunchMapper.class); + List inputNodes; + DoNode reduceNode = null; + if (group != null) { + job.setReducerClass(CrunchReducer.class); + List reduceNodes = Lists.newArrayList(outputNodes); + serialize(reduceNodes, conf, workingPath, NodeContext.REDUCE); + reduceNode = reduceNodes.get(0); + + if (combineFnTable != null) { + job.setCombinerClass(CrunchCombiner.class); + DoNode combinerInputNode = group.createDoNode(); + DoNode combineNode = combineFnTable.createDoNode(); + combineNode.addChild(group.getGroupingNode()); + combinerInputNode.addChild(combineNode); + serialize(ImmutableList.of(combinerInputNode), conf, workingPath, + NodeContext.COMBINE); + } + + group.configureShuffle(job); + + DoNode mapOutputNode = group.getGroupingNode(); + Set mapNodes = Sets.newHashSet(); + for (NodePath nodePath : mapNodePaths) { + // Advance these one step, since we've already configured + // the grouping node, and the PGroupedTableImpl is the tail + // of the NodePath. + Iterator> iter = nodePath.descendingIterator(); + iter.next(); + mapNodes.add(walkPath(iter, mapOutputNode)); + } + inputNodes = Lists.newArrayList(mapNodes); + } else { // No grouping + job.setNumReduceTasks(0); + inputNodes = Lists.newArrayList(outputNodes); + } + serialize(inputNodes, conf, workingPath, NodeContext.MAP); + + if (inputNodes.size() == 1) { + DoNode inputNode = inputNodes.get(0); + inputNode.getSource().configureSource(job, -1); + } else { + for (int i = 0; i < inputNodes.size(); i++) { + DoNode inputNode = inputNodes.get(i); + inputNode.getSource().configureSource(job, i); + } + job.setInputFormatClass(CrunchInputFormat.class); + } + job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode)); + + return new CrunchJob(job, outputPath, outputHandler); + } + + private void serialize(List nodes, Configuration conf, Path workingPath, + NodeContext context) throws IOException { + List rtNodes = Lists.newArrayList(); + for (DoNode node : nodes) { + rtNodes.add(node.toRTNode(true, conf, context)); + } + Path path = new Path(workingPath, context.toString()); + DistCache.write(conf, path, rtNodes); + } + + private String createJobName(String pipelineName, List mapNodes, DoNode reduceNode) { + JobNameBuilder builder = new JobNameBuilder(pipelineName); + builder.visit(mapNodes); + if (reduceNode != null) { + builder.visit(reduceNode); + } + return builder.build(); + } + + private DoNode walkPath(Iterator> iter, DoNode working) { + while (iter.hasNext()) { + PCollectionImpl collect = iter.next(); + if (combineFnTable != null && + !(collect instanceof PGroupedTableImpl)) { + combineFnTable = null; + } else if (collect instanceof DoTableImpl && + ((DoTableImpl) collect).hasCombineFn()) { + combineFnTable = (DoTableImpl) collect; + } + if (!nodes.containsKey(collect)) { + nodes.put(collect, collect.createDoNode()); + } + DoNode parent = nodes.get(collect); + parent.addChild(working); + working = parent; + } + return working; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java new file mode 100644 index 0000000..af193f4 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; + +import org.apache.crunch.Target; +import org.apache.crunch.io.MapReduceTarget; +import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.io.PathTarget; +import org.apache.crunch.types.PType; +import com.google.common.collect.Lists; + +public class MSCROutputHandler implements OutputHandler { + + private final Job job; + private final Path path; + private final boolean mapOnlyJob; + + private DoNode workingNode; + private List multiPaths; + + public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) { + this.job = job; + this.path = outputPath; + this.mapOnlyJob = mapOnlyJob; + this.multiPaths = Lists.newArrayList(); + } + + public void configureNode(DoNode node, Target target) { + workingNode = node; + target.accept(this, node.getPType()); + } + + public boolean configure(Target target, PType ptype) { + if (target instanceof MapReduceTarget && target instanceof PathTarget) { + String name = PlanningParameters.MULTI_OUTPUT_PREFIX + multiPaths.size(); + multiPaths.add(((PathTarget) target).getPath()); + workingNode.setOutputName(name); + ((MapReduceTarget) target).configureForMapReduce(job, ptype, path, name); + return true; + } + if (target instanceof MapReduceTarget) { + ((MapReduceTarget) target).configureForMapReduce(job, ptype, null, null); + return true; + } + return false; + } + + public boolean isMapOnlyJob() { + return mapOnlyJob; + } + + public List getMultiPaths() { + return multiPaths; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java new file mode 100644 index 0000000..29aeafd --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -0,0 +1,376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.io.IOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.crunch.Source; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.collect.DoCollectionImpl; +import org.apache.crunch.impl.mr.collect.DoTableImpl; +import org.apache.crunch.impl.mr.collect.InputCollection; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; +import org.apache.crunch.impl.mr.collect.UnionCollection; +import org.apache.crunch.impl.mr.exec.MRExecutor; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class MSCRPlanner { + + // Used to ensure that we always build pipelines starting from the deepest outputs, which + // helps ensure that we handle intermediate outputs correctly. + private static final Comparator> DEPTH_COMPARATOR = new Comparator>() { + @Override + public int compare(PCollectionImpl left, PCollectionImpl right) { + int cmp = right.getDepth() - left.getDepth(); + if (cmp == 0){ + // Ensure we don't throw away two output collections at the same depth. + // Using the collection name would be nicer here, but names aren't necessarily unique + cmp = new Integer(right.hashCode()).compareTo(left.hashCode()); + } + return cmp; + } + }; + + private final MRPipeline pipeline; + private final Map, Set> outputs; + + public MSCRPlanner(MRPipeline pipeline, + Map, Set> outputs) { + this.pipeline = pipeline; + this.outputs = new TreeMap, Set>(DEPTH_COMPARATOR); + this.outputs.putAll(outputs); + } + + public MRExecutor plan(Class jarClass, Configuration conf) + throws IOException { + // Constructs all of the node paths, which either start w/an input + // or a GBK and terminate in an output collection of any type. + NodeVisitor visitor = new NodeVisitor(); + for (PCollectionImpl output : outputs.keySet()) { + visitor.visitOutput(output); + } + + // Pull out the node paths. + Map, Set> nodePaths = visitor.getNodePaths(); + + // Keeps track of the dependencies from collections -> jobs and then + // between different jobs. + Map, JobPrototype> assignments = Maps.newHashMap(); + Map, Set> jobDependencies = + new HashMap, Set>(); + + // Find the set of GBKs that DO NOT depend on any other GBK. + Set> workingGroupings = null; + while (!(workingGroupings = getWorkingGroupings(nodePaths)).isEmpty()) { + + for (PGroupedTableImpl grouping : workingGroupings) { + Set mapInputPaths = nodePaths.get(grouping); + JobPrototype proto = JobPrototype.createMapReduceJob(grouping, + mapInputPaths, pipeline.createTempPath()); + assignments.put(grouping, proto); + if (jobDependencies.containsKey(grouping)) { + for (JobPrototype dependency : jobDependencies.get(grouping)) { + proto.addDependency(dependency); + } + } + } + + Map, Set> dependencyPaths = getDependencyPaths( + workingGroupings, nodePaths); + for (Map.Entry, Set> entry : dependencyPaths.entrySet()) { + PGroupedTableImpl grouping = entry.getKey(); + Set currentNodePaths = entry.getValue(); + + JobPrototype proto = assignments.get(grouping); + Set gbkPaths = Sets.newHashSet(); + for (NodePath nodePath : currentNodePaths) { + PCollectionImpl tail = nodePath.tail(); + if (tail instanceof PGroupedTableImpl) { + gbkPaths.add(nodePath); + if (!jobDependencies.containsKey(tail)) { + jobDependencies.put(tail, Sets.newHashSet()); + } + jobDependencies.get(tail).add(proto); + } + } + + if (!gbkPaths.isEmpty()) { + handleGroupingDependencies(gbkPaths, currentNodePaths); + } + + // At this point, all of the dependencies for the working groups will be + // file outputs, and so we can add them all to the JobPrototype-- we now have + // a complete job. + HashMultimap reduceOutputs = HashMultimap.create(); + for (NodePath nodePath : currentNodePaths) { + assignments.put(nodePath.tail(), proto); + for (Target target : outputs.get(nodePath.tail())) { + reduceOutputs.put(target, nodePath); + } + } + proto.addReducePaths(reduceOutputs); + + // We've processed this GBK-- remove it from the set of nodePaths we + // need to process in the next step. + nodePaths.remove(grouping); + } + } + + // Process any map-only jobs that are remaining. + if (!nodePaths.isEmpty()) { + for (Map.Entry, Set> entry : nodePaths + .entrySet()) { + PCollectionImpl collect = entry.getKey(); + if (!assignments.containsKey(collect)) { + HashMultimap mapOutputs = HashMultimap.create(); + for (NodePath nodePath : entry.getValue()) { + for (Target target : outputs.get(nodePath.tail())) { + mapOutputs.put(target, nodePath); + } + } + JobPrototype proto = JobPrototype.createMapOnlyJob(mapOutputs, + pipeline.createTempPath()); + + if (jobDependencies.containsKey(collect)) { + for (JobPrototype dependency : jobDependencies.get(collect)) { + proto.addDependency(dependency); + } + } + assignments.put(collect, proto); + } + } + } + + MRExecutor exec = new MRExecutor(jarClass); + for (JobPrototype proto : Sets.newHashSet(assignments.values())) { + exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline)); + } + return exec; + } + + private Map, Set> getDependencyPaths( + Set> workingGroupings, + Map, Set> nodePaths) { + Map, Set> dependencyPaths = Maps.newHashMap(); + for (PGroupedTableImpl grouping : workingGroupings) { + dependencyPaths.put(grouping, Sets. newHashSet()); + } + + // Find the targets that depend on one of the elements of the current + // working group. + for (PCollectionImpl target : nodePaths.keySet()) { + if (!workingGroupings.contains(target)) { + for (NodePath nodePath : nodePaths.get(target)) { + if (workingGroupings.contains(nodePath.head())) { + dependencyPaths.get(nodePath.head()).add(nodePath); + } + } + } + } + return dependencyPaths; + } + + private int getSplitIndex(Set currentNodePaths) { + List>> iters = Lists.newArrayList(); + for (NodePath nodePath : currentNodePaths) { + Iterator> iter = nodePath.iterator(); + iter.next(); // prime this past the initial NGroupedTableImpl + iters.add(iter); + } + + // Find the lowest point w/the lowest cost to be the split point for + // all of the dependent paths. + boolean end = false; + int splitIndex = -1; + while (!end) { + splitIndex++; + PCollectionImpl current = null; + for (Iterator> iter : iters) { + if (iter.hasNext()) { + PCollectionImpl next = iter.next(); + if (next instanceof PGroupedTableImpl) { + end = true; + break; + } else if (current == null) { + current = next; + } else if (current != next) { + end = true; + break; + } + } else { + end = true; + break; + } + } + } + // TODO: Add costing calcs here. + return splitIndex; + } + + private void handleGroupingDependencies(Set gbkPaths, + Set currentNodePaths) throws IOException { + int splitIndex = getSplitIndex(currentNodePaths); + PCollectionImpl splitTarget = currentNodePaths.iterator().next() + .get(splitIndex); + if (!outputs.containsKey(splitTarget)) { + outputs.put(splitTarget, Sets.newHashSet()); + } + + SourceTarget srcTarget = null; + Target targetToReplace = null; + for (Target t : outputs.get(splitTarget)) { + if (t instanceof SourceTarget) { + srcTarget = (SourceTarget) t; + break; + } else { + srcTarget = t.asSourceTarget(splitTarget.getPType()); + if (srcTarget != null) { + targetToReplace = t; + break; + } + } + } + if (targetToReplace != null) { + outputs.get(splitTarget).remove(targetToReplace); + } else if (srcTarget == null) { + srcTarget = pipeline.createIntermediateOutput(splitTarget.getPType()); + } + outputs.get(splitTarget).add(srcTarget); + splitTarget.materializeAt(srcTarget); + + PCollectionImpl inputNode = (PCollectionImpl) pipeline.read(srcTarget); + Set nextNodePaths = Sets.newHashSet(); + for (NodePath nodePath : currentNodePaths) { + if (gbkPaths.contains(nodePath)) { + nextNodePaths.add(nodePath.splitAt(splitIndex, inputNode)); + } else { + nextNodePaths.add(nodePath); + } + } + currentNodePaths.clear(); + currentNodePaths.addAll(nextNodePaths); + } + + private Set> getWorkingGroupings( + Map, Set> nodePaths) { + Set> gbks = Sets.newHashSet(); + for (PCollectionImpl target : nodePaths.keySet()) { + if (target instanceof PGroupedTableImpl) { + boolean hasGBKDependency = false; + for (NodePath nodePath : nodePaths.get(target)) { + if (nodePath.head() instanceof PGroupedTableImpl) { + hasGBKDependency = true; + break; + } + } + if (!hasGBKDependency) { + gbks.add((PGroupedTableImpl) target); + } + } + } + return gbks; + } + + private static class NodeVisitor implements PCollectionImpl.Visitor { + + private final Map, Set> nodePaths; + private final Map, Source> inputs; + private PCollectionImpl workingNode; + private NodePath workingPath; + + public NodeVisitor() { + this.nodePaths = new HashMap, Set>(); + this.inputs = new HashMap, Source>(); + } + + public Map, Set> getNodePaths() { + return nodePaths; + } + + public void visitOutput(PCollectionImpl output) { + nodePaths.put(output, Sets. newHashSet()); + workingNode = output; + workingPath = new NodePath(); + output.accept(this); + } + + @Override + public void visitInputCollection(InputCollection collection) { + workingPath.close(collection); + inputs.put(collection, collection.getSource()); + nodePaths.get(workingNode).add(workingPath); + } + + @Override + public void visitUnionCollection(UnionCollection collection) { + PCollectionImpl baseNode = workingNode; + NodePath basePath = workingPath; + for (PCollectionImpl parent : collection.getParents()) { + workingPath = new NodePath(basePath); + workingNode = baseNode; + processParent(parent); + } + } + + @Override + public void visitDoFnCollection(DoCollectionImpl collection) { + workingPath.push(collection); + processParent(collection.getOnlyParent()); + } + + @Override + public void visitDoTable(DoTableImpl collection) { + workingPath.push(collection); + processParent(collection.getOnlyParent()); + } + + @Override + public void visitGroupedTable(PGroupedTableImpl collection) { + workingPath.close(collection); + nodePaths.get(workingNode).add(workingPath); + workingNode = collection; + nodePaths.put(workingNode, Sets. newHashSet()); + workingPath = new NodePath(collection); + processParent(collection.getOnlyParent()); + } + + private void processParent(PCollectionImpl parent) { + if (!nodePaths.containsKey(parent)) { + parent.accept(this); + } else { + workingPath.close(parent); + nodePaths.get(workingNode).add(workingPath); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java new file mode 100644 index 0000000..4ab30ca --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.util.Iterator; +import java.util.LinkedList; + +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import com.google.common.collect.Lists; + +class NodePath implements Iterable> { + private LinkedList> path; + + public NodePath() { + this.path = Lists.newLinkedList(); + } + + public NodePath(PCollectionImpl tail) { + this.path = Lists.newLinkedList(); + this.path.add(tail); + } + + public NodePath(NodePath other) { + this.path = Lists.newLinkedList(other.path); + } + + public void push(PCollectionImpl stage) { + this.path.push((PCollectionImpl) stage); + } + + public void close(PCollectionImpl head) { + this.path.push(head); + } + + public Iterator> iterator() { + return path.iterator(); + } + + public Iterator> descendingIterator() { + return path.descendingIterator(); + } + + public PCollectionImpl get(int index) { + return path.get(index); + } + + public PCollectionImpl head() { + return path.peekFirst(); + } + + public PCollectionImpl tail() { + return path.peekLast(); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof NodePath)) { + return false; + } + NodePath nodePath = (NodePath) other; + return path.equals(nodePath.path); + } + + @Override + public int hashCode() { + return 17 + 37 * path.hashCode(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (PCollectionImpl collect : path) { + sb.append(collect.getName() + "|"); + } + sb.deleteCharAt(sb.length() - 1); + return sb.toString(); + } + + public NodePath splitAt(int splitIndex, PCollectionImpl newHead) { + NodePath top = new NodePath(); + for (int i = 0; i <= splitIndex; i++) { + top.path.add(path.get(i)); + } + LinkedList> nextPath = Lists.newLinkedList(); + nextPath.add(newHead); + nextPath.addAll(path.subList(splitIndex + 1, path.size())); + path = nextPath; + return top; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java new file mode 100644 index 0000000..b8e59a3 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +public class PlanningParameters { + + public static final String MULTI_OUTPUT_PREFIX = "out"; + + public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir"; + + private PlanningParameters() {} +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java new file mode 100644 index 0000000..fcfbc36 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +public class CrunchCombiner extends CrunchReducer { + + @Override + protected NodeContext getNodeContext() { + return NodeContext.COMBINE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java new file mode 100644 index 0000000..6b49735 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.ReflectionUtils; + +import org.apache.crunch.io.impl.InputBundle; +import com.google.common.collect.Lists; + +public class CrunchInputFormat extends InputFormat { + + @Override + public List getSplits(JobContext job) throws IOException, + InterruptedException { + List splits = Lists.newArrayList(); + Configuration conf = job.getConfiguration(); + Map>> formatNodeMap = CrunchInputs.getFormatNodeMap(job); + + // First, build a map of InputFormats to Paths + for (Map.Entry>> entry : formatNodeMap.entrySet()) { + InputBundle inputBundle = entry.getKey(); + Job jobCopy = new Job(conf); + InputFormat format = (InputFormat) ReflectionUtils.newInstance( + inputBundle.getInputFormatClass(), jobCopy.getConfiguration()); + for (Map.Entry> nodeEntry : entry.getValue() + .entrySet()) { + Integer nodeIndex = nodeEntry.getKey(); + List paths = nodeEntry.getValue(); + FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths.size()])); + + // Get splits for each input path and tag with InputFormat + // and Mapper types by wrapping in a TaggedInputSplit. + List pathSplits = format.getSplits(jobCopy); + for (InputSplit pathSplit : pathSplits) { + splits.add(new CrunchInputSplit(pathSplit, inputBundle.getInputFormatClass(), + inputBundle.getExtraConfiguration(), nodeIndex, jobCopy.getConfiguration())); + } + } + } + return splits; + } + + @Override + public RecordReader createRecordReader(InputSplit inputSplit, + TaskAttemptContext context) throws IOException, InterruptedException { + return new CrunchRecordReader(inputSplit, context); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java new file mode 100644 index 0000000..b57ca58 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.util.ReflectionUtils; + +public class CrunchInputSplit extends InputSplit implements Configurable, Writable { + + private InputSplit inputSplit; + private Class inputFormatClass; + private Map extraConf; + private int nodeIndex; + private Configuration conf; + + public CrunchInputSplit() { + // default constructor + } + + public CrunchInputSplit(InputSplit inputSplit, + Class inputFormatClass, Map extraConf, + int nodeIndex, Configuration conf) { + this.inputSplit = inputSplit; + this.inputFormatClass = inputFormatClass; + this.extraConf = extraConf; + this.nodeIndex = nodeIndex; + this.conf = conf; + } + + public int getNodeIndex() { + return nodeIndex; + } + + public InputSplit getInputSplit() { + return inputSplit; + } + + public Class getInputFormatClass() { + return inputFormatClass; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return inputSplit.getLength(); + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return inputSplit.getLocations(); + } + + public void readFields(DataInput in) throws IOException { + nodeIndex = in.readInt(); + int extraConfSize = in.readInt(); + if (extraConfSize > 0) { + for (int i = 0; i < extraConfSize; i++) { + conf.set(in.readUTF(), in.readUTF()); + } + } + inputFormatClass = (Class>) readClass(in); + Class inputSplitClass = (Class) readClass(in); + inputSplit = (InputSplit) ReflectionUtils + .newInstance(inputSplitClass, conf); + SerializationFactory factory = new SerializationFactory(conf); + Deserializer deserializer = factory.getDeserializer(inputSplitClass); + deserializer.open((DataInputStream) in); + inputSplit = (InputSplit) deserializer.deserialize(inputSplit); + } + + private Class readClass(DataInput in) throws IOException { + String className = Text.readString(in); + try { + return conf.getClassByName(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException("readObject can't find class", e); + } + } + + public void write(DataOutput out) throws IOException { + out.writeInt(nodeIndex); + out.writeInt(extraConf.size()); + for (Map.Entry e : extraConf.entrySet()) { + out.writeUTF(e.getKey()); + out.writeUTF(e.getValue()); + } + Text.writeString(out, inputFormatClass.getName()); + Text.writeString(out, inputSplit.getClass().getName()); + SerializationFactory factory = new SerializationFactory(conf); + Serializer serializer = factory.getSerializer(inputSplit.getClass()); + serializer.open((DataOutputStream) out); + serializer.serialize(inputSplit); + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java new file mode 100644 index 0000000..8fa1d56 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; + +import org.apache.crunch.io.impl.InputBundle; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class CrunchInputs { + + private static final char RECORD_SEP = ','; + private static final char FIELD_SEP = ';'; + private static final Joiner JOINER = Joiner.on(FIELD_SEP); + private static final Splitter SPLITTER = Splitter.on(FIELD_SEP); + + public static void addInputPath(Job job, Path path, + InputBundle inputBundle, int nodeIndex) { + Configuration conf = job.getConfiguration(); + String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString()); + String existing = conf.get(RuntimeParameters.MULTI_INPUTS); + conf.set(RuntimeParameters.MULTI_INPUTS, existing == null ? inputs : existing + RECORD_SEP + + inputs); + } + + public static Map>> getFormatNodeMap( + JobContext job) { + Map>> formatNodeMap = Maps.newHashMap(); + Configuration conf = job.getConfiguration(); + for (String input : Splitter.on(RECORD_SEP).split(conf.get(RuntimeParameters.MULTI_INPUTS))) { + List fields = Lists.newArrayList(SPLITTER.split(input)); + InputBundle inputBundle = InputBundle.fromSerialized(fields.get(0)); + if (!formatNodeMap.containsKey(inputBundle)) { + formatNodeMap.put(inputBundle, Maps.> newHashMap()); + } + Integer nodeIndex = Integer.valueOf(fields.get(1)); + if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) { + formatNodeMap.get(inputBundle).put(nodeIndex, + Lists. newLinkedList()); + } + formatNodeMap.get(inputBundle).get(nodeIndex) + .add(new Path(fields.get(2))); + } + return formatNodeMap; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java new file mode 100644 index 0000000..814c8c3 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.Mapper; + +public class CrunchMapper extends Mapper { + + private static final Log LOG = LogFactory.getLog(CrunchMapper.class); + + private RTNode node; + private CrunchTaskContext ctxt; + private boolean debug; + + @Override + protected void setup(Mapper.Context context) { + List nodes; + this.ctxt = new CrunchTaskContext(context, NodeContext.MAP); + try { + nodes = ctxt.getNodes(); + } catch (IOException e) { + LOG.info("Crunch deserialization error", e); + throw new CrunchRuntimeException(e); + } + if (nodes.size() == 1) { + this.node = nodes.get(0); + } else { + CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit(); + this.node = nodes.get(split.getNodeIndex()); + } + this.debug = ctxt.isDebugRun(); + } + + @Override + protected void map(Object k, Object v, + Mapper.Context context) { + if (debug) { + try { + node.process(k, v); + } catch (Exception e) { + LOG.error("Mapper exception", e); + } + } else { + node.process(k, v); + } + } + + @Override + protected void cleanup(Mapper.Context context) { + node.cleanup(); + ctxt.cleanup(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java new file mode 100644 index 0000000..5967aa9 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; + +class CrunchRecordReader extends RecordReader { + + private final RecordReader delegate; + + public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) + throws IOException, InterruptedException { + CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit; + InputFormat inputFormat = (InputFormat) ReflectionUtils + .newInstance(crunchSplit.getInputFormatClass(), crunchSplit.getConf()); + this.delegate = inputFormat.createRecordReader( + crunchSplit.getInputSplit(), TaskAttemptContextFactory.create( + crunchSplit.getConf(), context.getTaskAttemptID())); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public K getCurrentKey() throws IOException, InterruptedException { + return delegate.getCurrentKey(); + } + + @Override + public V getCurrentValue() throws IOException, InterruptedException { + return delegate.getCurrentValue(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return delegate.getProgress(); + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit; + InputSplit delegateSplit = crunchSplit.getInputSplit(); + delegate.initialize(delegateSplit, TaskAttemptContextFactory.create( + crunchSplit.getConf(), context.getTaskAttemptID())); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return delegate.nextKeyValue(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java new file mode 100644 index 0000000..aa5fc95 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.Reducer; + +public class CrunchReducer extends Reducer { + + private static final Log LOG = LogFactory.getLog(CrunchReducer.class); + + private RTNode node; + private CrunchTaskContext ctxt; + private boolean debug; + + protected NodeContext getNodeContext() { + return NodeContext.REDUCE; + } + + @Override + protected void setup(Reducer.Context context) { + this.ctxt = new CrunchTaskContext(context, getNodeContext()); + try { + List nodes = ctxt.getNodes(); + this.node = nodes.get(0); + } catch (IOException e) { + LOG.info("Crunch deserialization error", e); + throw new CrunchRuntimeException(e); + } + this.debug = ctxt.isDebugRun(); + } + + @Override + protected void reduce(Object key, Iterable values, + Reducer.Context context) { + if (debug) { + try { + node.processIterable(key, values); + } catch (Exception e) { + LOG.error("Reducer exception", e); + } + } else { + node.processIterable(key, values); + } + } + + @Override + protected void cleanup(Reducer.Context context) { + node.cleanup(); + ctxt.cleanup(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java new file mode 100644 index 0000000..9eac51c --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +public class CrunchRuntimeException extends RuntimeException { + + private boolean logged = false; + + public CrunchRuntimeException(String msg) { + super(msg); + } + + public CrunchRuntimeException(Exception e) { + super(e); + } + + public CrunchRuntimeException(String msg, Exception e) { + super(msg, e); + } + + public boolean wasLogged() { + return logged; + } + + public void markLogged() { + this.logged = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java new file mode 100644 index 0000000..dd04813 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs; + +import org.apache.crunch.impl.mr.plan.PlanningParameters; +import org.apache.crunch.util.DistCache; + +public class CrunchTaskContext { + + private final TaskInputOutputContext taskContext; + private final NodeContext nodeContext; + private CrunchMultipleOutputs multipleOutputs; + + public CrunchTaskContext( + TaskInputOutputContext taskContext, + NodeContext nodeContext) { + this.taskContext = taskContext; + this.nodeContext = nodeContext; + } + + public TaskInputOutputContext getContext() { + return taskContext; + } + + public NodeContext getNodeContext() { + return nodeContext; + } + + public List getNodes() throws IOException { + Configuration conf = taskContext.getConfiguration(); + Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString()); + @SuppressWarnings("unchecked") + List nodes = (List) DistCache.read(conf, path); + if (nodes != null) { + for (RTNode node : nodes) { + node.initialize(this); + } + } + return nodes; + } + + public boolean isDebugRun() { + Configuration conf = taskContext.getConfiguration(); + return conf.getBoolean(RuntimeParameters.DEBUG, false); + } + + public void cleanup() { + if (multipleOutputs != null) { + try { + multipleOutputs.close(); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } catch (InterruptedException e) { + throw new CrunchRuntimeException(e); + } + } + } + + public CrunchMultipleOutputs getMultipleOutputs() { + if (multipleOutputs == null) { + multipleOutputs = new CrunchMultipleOutputs(taskContext); + } + return multipleOutputs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java b/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java new file mode 100644 index 0000000..648e87c --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import org.apache.crunch.impl.mr.plan.DoNode; + +/** + * Enum that is associated with a serialized {@link DoNode} instance, so we know + * how to use it within the context of a particular MR job. + * + */ +public enum NodeContext { + MAP, REDUCE, COMBINE; + + public String getConfigurationKey() { + return "crunch.donode." + toString().toLowerCase(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java new file mode 100644 index 0000000..4debfc1 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import java.io.Serializable; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.impl.mr.emit.IntermediateEmitter; +import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter; +import org.apache.crunch.impl.mr.emit.OutputEmitter; +import org.apache.crunch.types.Converter; + +public class RTNode implements Serializable { + + private static final Log LOG = LogFactory.getLog(RTNode.class); + + private final String nodeName; + private DoFn fn; + private final List children; + private final Converter inputConverter; + private final Converter outputConverter; + private final String outputName; + + private transient Emitter emitter; + + public RTNode(DoFn fn, String name, List children, + Converter inputConverter, Converter outputConverter, String outputName) { + this.fn = fn; + this.nodeName = name; + this.children = children; + this.inputConverter = inputConverter; + this.outputConverter = outputConverter; + this.outputName = outputName; + } + + public void initialize(CrunchTaskContext ctxt) { + if (emitter != null) { + // Already initialized + return; + } + + fn.setContext(ctxt.getContext()); + for (RTNode child : children) { + child.initialize(ctxt); + } + + if (outputConverter != null) { + if (outputName != null) { + this.emitter = new MultipleOutputEmitter( + outputConverter, ctxt.getMultipleOutputs(), outputName); + } else { + this.emitter = new OutputEmitter( + outputConverter, ctxt.getContext()); + } + } else if (!children.isEmpty()) { + this.emitter = new IntermediateEmitter(children); + } else { + throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName); + } + } + + public boolean isLeafNode() { + return outputConverter != null && children.isEmpty(); + } + + public void process(Object input) { + try { + fn.process(input, emitter); + } catch (CrunchRuntimeException e) { + if (!e.wasLogged()) { + LOG.info(String.format("Crunch exception in '%s' for input: %s", + nodeName, input.toString()), e); + e.markLogged(); + } + throw e; + } + } + + public void process(Object key, Object value) { + process(inputConverter.convertInput(key, value)); + } + + public void processIterable(Object key, Iterable values) { + process(inputConverter.convertIterableInput(key, values)); + } + + public void cleanup() { + fn.cleanup(emitter); + emitter.flush(); + for (RTNode child : children) { + child.cleanup(); + } + } + + @Override + public String toString() { + return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children=" + + children + ", inputConverter=" + inputConverter + + ", outputConverter=" + outputConverter + ", outputName=" + outputName + + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java new file mode 100644 index 0000000..5e534eb --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +/** + * Parameters used during the runtime execution. + * + */ +public class RuntimeParameters { + + public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets"; + + public static final String MULTI_INPUTS = "crunch.inputs.dir"; + + public static final String DEBUG = "crunch.debug"; + + // Not instantiated + private RuntimeParameters() {} +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java b/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java new file mode 100644 index 0000000..2cfa615 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.run; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +/** + * + */ +@SuppressWarnings("unchecked") +public class TaskAttemptContextFactory { + + private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class); + + private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory(); + + public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) { + return INSTANCE.createInternal(conf, taskAttemptId); + } + + private Constructor taskAttemptConstructor; + + private TaskAttemptContextFactory() { + Class implClass = TaskAttemptContext.class; + if (implClass.isInterface()) { + try { + implClass = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); + } catch (ClassNotFoundException e) { + LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e); + } + } + try { + this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class); + } catch (Exception e) { + LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e); + } + } + + private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) { + try { + return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId); + } catch (Exception e) { + LOG.error("Could not construct a TaskAttemptContext instance", e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/At.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/io/At.java b/src/main/java/org/apache/crunch/io/At.java new file mode 100644 index 0000000..2f5fe8b --- /dev/null +++ b/src/main/java/org/apache/crunch/io/At.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Scan; + +import org.apache.crunch.SourceTarget; +import org.apache.crunch.io.avro.AvroFileSourceTarget; +import org.apache.crunch.io.hbase.HBaseSourceTarget; +import org.apache.crunch.io.seq.SeqFileSourceTarget; +import org.apache.crunch.io.seq.SeqFileTableSourceTarget; +import org.apache.crunch.io.text.TextFileSourceTarget; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.writable.Writables; + +/** + * Static factory methods for creating various {@link SourceTarget} types. + * + */ +public class At { + public static AvroFileSourceTarget avroFile(String pathName, AvroType avroType) { + return avroFile(new Path(pathName), avroType); + } + + public static AvroFileSourceTarget avroFile(Path path, AvroType avroType) { + return new AvroFileSourceTarget(path, avroType); + } + + public static HBaseSourceTarget hbaseTable(String table) { + return hbaseTable(table, new Scan()); + } + + public static HBaseSourceTarget hbaseTable(String table, Scan scan) { + return new HBaseSourceTarget(table, scan); + } + + public static SeqFileSourceTarget sequenceFile(String pathName, PType ptype) { + return sequenceFile(new Path(pathName), ptype); + } + + public static SeqFileSourceTarget sequenceFile(Path path, PType ptype) { + return new SeqFileSourceTarget(path, ptype); + } + + public static SeqFileTableSourceTarget sequenceFile(String pathName, PType keyType, + PType valueType) { + return sequenceFile(new Path(pathName), keyType, valueType); + } + + public static SeqFileTableSourceTarget sequenceFile(Path path, PType keyType, + PType valueType) { + PTypeFamily ptf = keyType.getFamily(); + return new SeqFileTableSourceTarget(path, ptf.tableOf(keyType, valueType)); + } + + public static TextFileSourceTarget textFile(String pathName) { + return textFile(new Path(pathName)); + } + + public static TextFileSourceTarget textFile(Path path) { + return textFile(path, Writables.strings()); + } + + public static TextFileSourceTarget textFile(String pathName, PType ptype) { + return textFile(new Path(pathName), ptype); + } + + public static TextFileSourceTarget textFile(Path path, PType ptype) { + return new TextFileSourceTarget(path, ptype); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/CompositePathIterable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/io/CompositePathIterable.java b/src/main/java/org/apache/crunch/io/CompositePathIterable.java new file mode 100644 index 0000000..0e4014a --- /dev/null +++ b/src/main/java/org/apache/crunch/io/CompositePathIterable.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +import com.google.common.collect.UnmodifiableIterator; + +public class CompositePathIterable implements Iterable { + + private final FileStatus[] stati; + private final FileSystem fs; + private final FileReaderFactory readerFactory; + + private static final PathFilter FILTER = new PathFilter() { + @Override + public boolean accept(Path path) { + return !path.getName().startsWith("_"); + } + }; + + public static Iterable create(FileSystem fs, Path path, FileReaderFactory readerFactory) throws IOException { + + if (!fs.exists(path)){ + throw new IOException("No files found to materialize at: " + path); + } + + FileStatus[] stati = null; + try { + stati = fs.listStatus(path, FILTER); + } catch (FileNotFoundException e) { + stati = null; + } + if (stati == null) { + throw new IOException("No files found to materialize at: " + path); + } + + if (stati.length == 0) { + return Collections.emptyList(); + } else { + return new CompositePathIterable(stati, fs, readerFactory); + } + + } + + private CompositePathIterable(FileStatus[] stati, FileSystem fs, FileReaderFactory readerFactory) { + this.stati = stati; + this.fs = fs; + this.readerFactory = readerFactory; + } + + @Override + public Iterator iterator() { + + return new UnmodifiableIterator() { + private int index = 0; + private Iterator iter = readerFactory.read(fs, stati[index++].getPath()); + + @Override + public boolean hasNext() { + if (!iter.hasNext()) { + while (index < stati.length) { + iter = readerFactory.read(fs, stati[index++].getPath()); + if (iter.hasNext()) { + return true; + } + } + return false; + } + return true; + } + + @Override + public T next() { + return iter.next(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/FileReaderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/io/FileReaderFactory.java b/src/main/java/org/apache/crunch/io/FileReaderFactory.java new file mode 100644 index 0000000..5cccb7b --- /dev/null +++ b/src/main/java/org/apache/crunch/io/FileReaderFactory.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io; + +import java.util.Iterator; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public interface FileReaderFactory { + Iterator read(FileSystem fs, Path path); +}