Return-Path: X-Original-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B30B29B75 for ; Wed, 11 Jul 2012 05:15:02 +0000 (UTC) Received: (qmail 12309 invoked by uid 500); 11 Jul 2012 05:14:57 -0000 Delivered-To: apmail-incubator-crunch-commits-archive@incubator.apache.org Received: (qmail 11710 invoked by uid 500); 11 Jul 2012 05:14:51 -0000 Mailing-List: contact crunch-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-commits@incubator.apache.org Received: (qmail 11167 invoked by uid 99); 11 Jul 2012 05:14:47 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jul 2012 05:14:47 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 89E37DAFE; Wed, 11 Jul 2012 05:14:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: crunch-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [20/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich Message-Id: <20120711051446.89E37DAFE@tyr.zones.apache.org> Date: Wed, 11 Jul 2012 05:14:46 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java deleted file mode 100644 index b678187..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.exec; - -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; -import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl; - -import org.apache.crunch.PipelineResult; -import com.google.common.collect.Lists; - -/** - * - * - */ -public class MRExecutor { - - private static final Log LOG = LogFactory.getLog(MRExecutor.class); - - private final CrunchJobControl control; - - public MRExecutor(Class jarClass) { - this.control = new CrunchJobControl(jarClass.toString()); - } - - public void addJob(CrunchJob job) { - this.control.addJob(job); - } - - public PipelineResult execute() { - try { - Thread controlThread = new Thread(control); - controlThread.start(); - while (!control.allFinished()) { - Thread.sleep(1000); - } - control.stop(); - } catch (InterruptedException e) { - LOG.info(e); - } - List failures = control.getFailedJobList(); - if (!failures.isEmpty()) { - System.err.println(failures.size() + " job failure(s) occurred:"); - for (CrunchControlledJob job : failures) { - System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage()); - } - } - List stages = Lists.newArrayList(); - for (CrunchControlledJob job : control.getSuccessfulJobList()) { - try { - stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters())); - } catch (Exception e) { - LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e); - } - } - return new PipelineResult(stages); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java deleted file mode 100644 index 5780da8..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.plan; - -import java.util.List; - -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.hadoop.conf.Configuration; - -import org.apache.crunch.DoFn; -import org.apache.crunch.Source; -import org.apache.crunch.impl.mr.run.NodeContext; -import org.apache.crunch.impl.mr.run.RTNode; -import org.apache.crunch.types.Converter; -import org.apache.crunch.types.PGroupedTableType; -import org.apache.crunch.types.PType; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -public class DoNode { - - private static final List NO_CHILDREN = ImmutableList.of(); - - private final DoFn fn; - private final String name; - private final PType ptype; - private final List children; - private final Converter outputConverter; - private final Source source; - private String outputName; - - private DoNode(DoFn fn, String name, PType ptype, List children, - Converter outputConverter, Source source) { - this.fn = fn; - this.name = name; - this.ptype = ptype; - this.children = children; - this.outputConverter = outputConverter; - this.source = source; - } - - private static List allowsChildren() { - return Lists.newArrayList(); - } - - public static DoNode createGroupingNode(String name, - PGroupedTableType ptype) { - DoFn fn = ptype.getOutputMapFn(); - return new DoNode(fn, name, ptype, NO_CHILDREN, - ptype.getGroupingConverter(), null); - } - - public static DoNode createOutputNode(String name, PType ptype) { - Converter outputConverter = ptype.getConverter(); - DoFn fn = ptype.getOutputMapFn(); - return new DoNode(fn, name, ptype, NO_CHILDREN, - outputConverter, null); - } - - public static DoNode createFnNode(String name, DoFn function, - PType ptype) { - return new DoNode(function, name, ptype, allowsChildren(), null, null); - } - - public static DoNode createInputNode(Source source) { - PType ptype = source.getType(); - DoFn fn = ptype.getInputMapFn(); - return new DoNode(fn, source.toString(), ptype, allowsChildren(), null, - source); - } - - public boolean isInputNode() { - return source != null; - } - - public boolean isOutputNode() { - return outputConverter != null; - } - - public String getName() { - return name; - } - - public List getChildren() { - return children; - } - - public Source getSource() { - return source; - } - - public PType getPType() { - return ptype; - } - - public DoNode addChild(DoNode node) { - if (!children.contains(node)) { - this.children.add(node); - } - return this; - } - - public void setOutputName(String outputName) { - if (outputConverter == null) { - throw new IllegalStateException( - "Cannot set output name w/o output converter: " + outputName); - } - this.outputName = outputName; - } - - public RTNode toRTNode(boolean inputNode, Configuration conf, NodeContext nodeContext) { - List childRTNodes = Lists.newArrayList(); - fn.configure(conf); - for (DoNode child : children) { - childRTNodes.add(child.toRTNode(false, conf, nodeContext)); - } - - Converter inputConverter = null; - if (inputNode) { - if (nodeContext == NodeContext.MAP) { - inputConverter = ptype.getConverter(); - } else { - inputConverter = ((PGroupedTableType) ptype).getGroupingConverter(); - } - } - return new RTNode(fn, name, childRTNodes, inputConverter, outputConverter, - outputName); - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof DoNode)) { - return false; - } - if (this == other) { - return true; - } - DoNode o = (DoNode) other; - return (name.equals(o.name) && fn.equals(o.fn) && source == o.source && - outputConverter == o.outputConverter); - } - - @Override - public int hashCode() { - HashCodeBuilder hcb = new HashCodeBuilder(); - return hcb.append(name).append(fn).append(source) - .append(outputConverter).toHashCode(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java b/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java deleted file mode 100644 index 55f690a..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.plan; - -import java.util.List; - -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; - -/** - * Visitor that traverses the {@code DoNode} instances in a job and builds - * a String that identifies the stages of the pipeline that belong to - * this job. - */ -public class JobNameBuilder { - - private static final Joiner JOINER = Joiner.on("+"); - private static final Joiner CHILD_JOINER = Joiner.on("/"); - - private String pipelineName; - List rootStack = Lists.newArrayList(); - - public JobNameBuilder(final String pipelineName){ - this.pipelineName = pipelineName; - } - - public void visit(DoNode node) { - visit(node, rootStack); - } - - public void visit(List nodes) { - visit(nodes, rootStack); - } - - private void visit(List nodes, List stack) { - if (nodes.size() == 1) { - visit(nodes.get(0), stack); - } else { - List childStack = Lists.newArrayList(); - for (int i = 0; i < nodes.size(); i++) { - DoNode node = nodes.get(i); - List subStack = Lists.newArrayList(); - visit(node, subStack); - if (!subStack.isEmpty()) { - childStack.add("[" + JOINER.join(subStack) + "]"); - } - } - if (!childStack.isEmpty()) { - stack.add("[" + CHILD_JOINER.join(childStack) + "]"); - } - } - } - - private void visit(DoNode node, List stack) { - String name = node.getName(); - if (!name.isEmpty()) { - stack.add(node.getName()); - } - visit(node.getChildren(), stack); - } - - public String build() { - return String.format("%s: %s", pipelineName, JOINER.join(rootStack)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java deleted file mode 100644 index 718ce5e..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.plan; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; - -import org.apache.crunch.Pipeline; -import org.apache.crunch.Target; -import org.apache.crunch.impl.mr.collect.DoTableImpl; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; -import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; -import org.apache.crunch.impl.mr.exec.CrunchJob; -import org.apache.crunch.impl.mr.run.CrunchCombiner; -import org.apache.crunch.impl.mr.run.CrunchInputFormat; -import org.apache.crunch.impl.mr.run.CrunchMapper; -import org.apache.crunch.impl.mr.run.CrunchReducer; -import org.apache.crunch.impl.mr.run.NodeContext; -import org.apache.crunch.impl.mr.run.RTNode; -import org.apache.crunch.util.DistCache; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -public class JobPrototype { - - public static JobPrototype createMapReduceJob(PGroupedTableImpl group, - Set inputs, Path workingPath) { - return new JobPrototype(inputs, group, workingPath); - } - - public static JobPrototype createMapOnlyJob( - HashMultimap mapNodePaths, Path workingPath) { - return new JobPrototype(mapNodePaths, workingPath); - } - - private final Set mapNodePaths; - private final PGroupedTableImpl group; - private final Set dependencies = Sets.newHashSet(); - private final Map, DoNode> nodes = Maps.newHashMap(); - private final Path workingPath; - - private HashMultimap targetsToNodePaths; - private DoTableImpl combineFnTable; - - private CrunchJob job; - - private JobPrototype(Set inputs, PGroupedTableImpl group, - Path workingPath) { - this.mapNodePaths = ImmutableSet.copyOf(inputs); - this.group = group; - this.workingPath = workingPath; - this.targetsToNodePaths = null; - } - - private JobPrototype(HashMultimap outputPaths, Path workingPath) { - this.group = null; - this.mapNodePaths = null; - this.workingPath = workingPath; - this.targetsToNodePaths = outputPaths; - } - - public void addReducePaths(HashMultimap outputPaths) { - if (group == null) { - throw new IllegalStateException( - "Cannot add a reduce phase to a map-only job"); - } - this.targetsToNodePaths = outputPaths; - } - - public void addDependency(JobPrototype dependency) { - this.dependencies.add(dependency); - } - - public CrunchJob getCrunchJob(Class jarClass, Configuration conf, Pipeline pipeline) throws IOException { - if (job == null) { - job = build(jarClass, conf, pipeline); - for (JobPrototype proto : dependencies) { - job.addDependingJob(proto.getCrunchJob(jarClass, conf, pipeline)); - } - } - return job; - } - - private CrunchJob build(Class jarClass, Configuration conf, Pipeline pipeline) throws IOException { - Job job = new Job(conf); - conf = job.getConfiguration(); - conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString()); - job.setJarByClass(jarClass); - - Set outputNodes = Sets.newHashSet(); - Set targets = targetsToNodePaths.keySet(); - Path outputPath = new Path(workingPath, "output"); - MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, - group == null); - for (Target target : targets) { - DoNode node = null; - for (NodePath nodePath : targetsToNodePaths.get(target)) { - if (node == null) { - PCollectionImpl collect = nodePath.tail(); - node = DoNode.createOutputNode(target.toString(), collect.getPType()); - outputHandler.configureNode(node, target); - } - outputNodes.add(walkPath(nodePath.descendingIterator(), node)); - } - } - - job.setMapperClass(CrunchMapper.class); - List inputNodes; - DoNode reduceNode = null; - if (group != null) { - job.setReducerClass(CrunchReducer.class); - List reduceNodes = Lists.newArrayList(outputNodes); - serialize(reduceNodes, conf, workingPath, NodeContext.REDUCE); - reduceNode = reduceNodes.get(0); - - if (combineFnTable != null) { - job.setCombinerClass(CrunchCombiner.class); - DoNode combinerInputNode = group.createDoNode(); - DoNode combineNode = combineFnTable.createDoNode(); - combineNode.addChild(group.getGroupingNode()); - combinerInputNode.addChild(combineNode); - serialize(ImmutableList.of(combinerInputNode), conf, workingPath, - NodeContext.COMBINE); - } - - group.configureShuffle(job); - - DoNode mapOutputNode = group.getGroupingNode(); - Set mapNodes = Sets.newHashSet(); - for (NodePath nodePath : mapNodePaths) { - // Advance these one step, since we've already configured - // the grouping node, and the PGroupedTableImpl is the tail - // of the NodePath. - Iterator> iter = nodePath.descendingIterator(); - iter.next(); - mapNodes.add(walkPath(iter, mapOutputNode)); - } - inputNodes = Lists.newArrayList(mapNodes); - } else { // No grouping - job.setNumReduceTasks(0); - inputNodes = Lists.newArrayList(outputNodes); - } - serialize(inputNodes, conf, workingPath, NodeContext.MAP); - - if (inputNodes.size() == 1) { - DoNode inputNode = inputNodes.get(0); - inputNode.getSource().configureSource(job, -1); - } else { - for (int i = 0; i < inputNodes.size(); i++) { - DoNode inputNode = inputNodes.get(i); - inputNode.getSource().configureSource(job, i); - } - job.setInputFormatClass(CrunchInputFormat.class); - } - job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode)); - - return new CrunchJob(job, outputPath, outputHandler); - } - - private void serialize(List nodes, Configuration conf, Path workingPath, - NodeContext context) throws IOException { - List rtNodes = Lists.newArrayList(); - for (DoNode node : nodes) { - rtNodes.add(node.toRTNode(true, conf, context)); - } - Path path = new Path(workingPath, context.toString()); - DistCache.write(conf, path, rtNodes); - } - - private String createJobName(String pipelineName, List mapNodes, DoNode reduceNode) { - JobNameBuilder builder = new JobNameBuilder(pipelineName); - builder.visit(mapNodes); - if (reduceNode != null) { - builder.visit(reduceNode); - } - return builder.build(); - } - - private DoNode walkPath(Iterator> iter, DoNode working) { - while (iter.hasNext()) { - PCollectionImpl collect = iter.next(); - if (combineFnTable != null && - !(collect instanceof PGroupedTableImpl)) { - combineFnTable = null; - } else if (collect instanceof DoTableImpl && - ((DoTableImpl) collect).hasCombineFn()) { - combineFnTable = (DoTableImpl) collect; - } - if (!nodes.containsKey(collect)) { - nodes.put(collect, collect.createDoNode()); - } - DoNode parent = nodes.get(collect); - parent.addChild(working); - working = parent; - } - return working; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java deleted file mode 100644 index af193f4..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.plan; - -import java.util.List; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; - -import org.apache.crunch.Target; -import org.apache.crunch.io.MapReduceTarget; -import org.apache.crunch.io.OutputHandler; -import org.apache.crunch.io.PathTarget; -import org.apache.crunch.types.PType; -import com.google.common.collect.Lists; - -public class MSCROutputHandler implements OutputHandler { - - private final Job job; - private final Path path; - private final boolean mapOnlyJob; - - private DoNode workingNode; - private List multiPaths; - - public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) { - this.job = job; - this.path = outputPath; - this.mapOnlyJob = mapOnlyJob; - this.multiPaths = Lists.newArrayList(); - } - - public void configureNode(DoNode node, Target target) { - workingNode = node; - target.accept(this, node.getPType()); - } - - public boolean configure(Target target, PType ptype) { - if (target instanceof MapReduceTarget && target instanceof PathTarget) { - String name = PlanningParameters.MULTI_OUTPUT_PREFIX + multiPaths.size(); - multiPaths.add(((PathTarget) target).getPath()); - workingNode.setOutputName(name); - ((MapReduceTarget) target).configureForMapReduce(job, ptype, path, name); - return true; - } - if (target instanceof MapReduceTarget) { - ((MapReduceTarget) target).configureForMapReduce(job, ptype, null, null); - return true; - } - return false; - } - - public boolean isMapOnlyJob() { - return mapOnlyJob; - } - - public List getMultiPaths() { - return multiPaths; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java deleted file mode 100644 index 29aeafd..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ /dev/null @@ -1,376 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.plan; - -import java.io.IOException; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -import org.apache.hadoop.conf.Configuration; - -import org.apache.crunch.Source; -import org.apache.crunch.SourceTarget; -import org.apache.crunch.Target; -import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.impl.mr.collect.DoCollectionImpl; -import org.apache.crunch.impl.mr.collect.DoTableImpl; -import org.apache.crunch.impl.mr.collect.InputCollection; -import org.apache.crunch.impl.mr.collect.PCollectionImpl; -import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; -import org.apache.crunch.impl.mr.collect.UnionCollection; -import org.apache.crunch.impl.mr.exec.MRExecutor; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -public class MSCRPlanner { - - // Used to ensure that we always build pipelines starting from the deepest outputs, which - // helps ensure that we handle intermediate outputs correctly. - private static final Comparator> DEPTH_COMPARATOR = new Comparator>() { - @Override - public int compare(PCollectionImpl left, PCollectionImpl right) { - int cmp = right.getDepth() - left.getDepth(); - if (cmp == 0){ - // Ensure we don't throw away two output collections at the same depth. - // Using the collection name would be nicer here, but names aren't necessarily unique - cmp = new Integer(right.hashCode()).compareTo(left.hashCode()); - } - return cmp; - } - }; - - private final MRPipeline pipeline; - private final Map, Set> outputs; - - public MSCRPlanner(MRPipeline pipeline, - Map, Set> outputs) { - this.pipeline = pipeline; - this.outputs = new TreeMap, Set>(DEPTH_COMPARATOR); - this.outputs.putAll(outputs); - } - - public MRExecutor plan(Class jarClass, Configuration conf) - throws IOException { - // Constructs all of the node paths, which either start w/an input - // or a GBK and terminate in an output collection of any type. - NodeVisitor visitor = new NodeVisitor(); - for (PCollectionImpl output : outputs.keySet()) { - visitor.visitOutput(output); - } - - // Pull out the node paths. - Map, Set> nodePaths = visitor.getNodePaths(); - - // Keeps track of the dependencies from collections -> jobs and then - // between different jobs. - Map, JobPrototype> assignments = Maps.newHashMap(); - Map, Set> jobDependencies = - new HashMap, Set>(); - - // Find the set of GBKs that DO NOT depend on any other GBK. - Set> workingGroupings = null; - while (!(workingGroupings = getWorkingGroupings(nodePaths)).isEmpty()) { - - for (PGroupedTableImpl grouping : workingGroupings) { - Set mapInputPaths = nodePaths.get(grouping); - JobPrototype proto = JobPrototype.createMapReduceJob(grouping, - mapInputPaths, pipeline.createTempPath()); - assignments.put(grouping, proto); - if (jobDependencies.containsKey(grouping)) { - for (JobPrototype dependency : jobDependencies.get(grouping)) { - proto.addDependency(dependency); - } - } - } - - Map, Set> dependencyPaths = getDependencyPaths( - workingGroupings, nodePaths); - for (Map.Entry, Set> entry : dependencyPaths.entrySet()) { - PGroupedTableImpl grouping = entry.getKey(); - Set currentNodePaths = entry.getValue(); - - JobPrototype proto = assignments.get(grouping); - Set gbkPaths = Sets.newHashSet(); - for (NodePath nodePath : currentNodePaths) { - PCollectionImpl tail = nodePath.tail(); - if (tail instanceof PGroupedTableImpl) { - gbkPaths.add(nodePath); - if (!jobDependencies.containsKey(tail)) { - jobDependencies.put(tail, Sets.newHashSet()); - } - jobDependencies.get(tail).add(proto); - } - } - - if (!gbkPaths.isEmpty()) { - handleGroupingDependencies(gbkPaths, currentNodePaths); - } - - // At this point, all of the dependencies for the working groups will be - // file outputs, and so we can add them all to the JobPrototype-- we now have - // a complete job. - HashMultimap reduceOutputs = HashMultimap.create(); - for (NodePath nodePath : currentNodePaths) { - assignments.put(nodePath.tail(), proto); - for (Target target : outputs.get(nodePath.tail())) { - reduceOutputs.put(target, nodePath); - } - } - proto.addReducePaths(reduceOutputs); - - // We've processed this GBK-- remove it from the set of nodePaths we - // need to process in the next step. - nodePaths.remove(grouping); - } - } - - // Process any map-only jobs that are remaining. - if (!nodePaths.isEmpty()) { - for (Map.Entry, Set> entry : nodePaths - .entrySet()) { - PCollectionImpl collect = entry.getKey(); - if (!assignments.containsKey(collect)) { - HashMultimap mapOutputs = HashMultimap.create(); - for (NodePath nodePath : entry.getValue()) { - for (Target target : outputs.get(nodePath.tail())) { - mapOutputs.put(target, nodePath); - } - } - JobPrototype proto = JobPrototype.createMapOnlyJob(mapOutputs, - pipeline.createTempPath()); - - if (jobDependencies.containsKey(collect)) { - for (JobPrototype dependency : jobDependencies.get(collect)) { - proto.addDependency(dependency); - } - } - assignments.put(collect, proto); - } - } - } - - MRExecutor exec = new MRExecutor(jarClass); - for (JobPrototype proto : Sets.newHashSet(assignments.values())) { - exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline)); - } - return exec; - } - - private Map, Set> getDependencyPaths( - Set> workingGroupings, - Map, Set> nodePaths) { - Map, Set> dependencyPaths = Maps.newHashMap(); - for (PGroupedTableImpl grouping : workingGroupings) { - dependencyPaths.put(grouping, Sets. newHashSet()); - } - - // Find the targets that depend on one of the elements of the current - // working group. - for (PCollectionImpl target : nodePaths.keySet()) { - if (!workingGroupings.contains(target)) { - for (NodePath nodePath : nodePaths.get(target)) { - if (workingGroupings.contains(nodePath.head())) { - dependencyPaths.get(nodePath.head()).add(nodePath); - } - } - } - } - return dependencyPaths; - } - - private int getSplitIndex(Set currentNodePaths) { - List>> iters = Lists.newArrayList(); - for (NodePath nodePath : currentNodePaths) { - Iterator> iter = nodePath.iterator(); - iter.next(); // prime this past the initial NGroupedTableImpl - iters.add(iter); - } - - // Find the lowest point w/the lowest cost to be the split point for - // all of the dependent paths. - boolean end = false; - int splitIndex = -1; - while (!end) { - splitIndex++; - PCollectionImpl current = null; - for (Iterator> iter : iters) { - if (iter.hasNext()) { - PCollectionImpl next = iter.next(); - if (next instanceof PGroupedTableImpl) { - end = true; - break; - } else if (current == null) { - current = next; - } else if (current != next) { - end = true; - break; - } - } else { - end = true; - break; - } - } - } - // TODO: Add costing calcs here. - return splitIndex; - } - - private void handleGroupingDependencies(Set gbkPaths, - Set currentNodePaths) throws IOException { - int splitIndex = getSplitIndex(currentNodePaths); - PCollectionImpl splitTarget = currentNodePaths.iterator().next() - .get(splitIndex); - if (!outputs.containsKey(splitTarget)) { - outputs.put(splitTarget, Sets.newHashSet()); - } - - SourceTarget srcTarget = null; - Target targetToReplace = null; - for (Target t : outputs.get(splitTarget)) { - if (t instanceof SourceTarget) { - srcTarget = (SourceTarget) t; - break; - } else { - srcTarget = t.asSourceTarget(splitTarget.getPType()); - if (srcTarget != null) { - targetToReplace = t; - break; - } - } - } - if (targetToReplace != null) { - outputs.get(splitTarget).remove(targetToReplace); - } else if (srcTarget == null) { - srcTarget = pipeline.createIntermediateOutput(splitTarget.getPType()); - } - outputs.get(splitTarget).add(srcTarget); - splitTarget.materializeAt(srcTarget); - - PCollectionImpl inputNode = (PCollectionImpl) pipeline.read(srcTarget); - Set nextNodePaths = Sets.newHashSet(); - for (NodePath nodePath : currentNodePaths) { - if (gbkPaths.contains(nodePath)) { - nextNodePaths.add(nodePath.splitAt(splitIndex, inputNode)); - } else { - nextNodePaths.add(nodePath); - } - } - currentNodePaths.clear(); - currentNodePaths.addAll(nextNodePaths); - } - - private Set> getWorkingGroupings( - Map, Set> nodePaths) { - Set> gbks = Sets.newHashSet(); - for (PCollectionImpl target : nodePaths.keySet()) { - if (target instanceof PGroupedTableImpl) { - boolean hasGBKDependency = false; - for (NodePath nodePath : nodePaths.get(target)) { - if (nodePath.head() instanceof PGroupedTableImpl) { - hasGBKDependency = true; - break; - } - } - if (!hasGBKDependency) { - gbks.add((PGroupedTableImpl) target); - } - } - } - return gbks; - } - - private static class NodeVisitor implements PCollectionImpl.Visitor { - - private final Map, Set> nodePaths; - private final Map, Source> inputs; - private PCollectionImpl workingNode; - private NodePath workingPath; - - public NodeVisitor() { - this.nodePaths = new HashMap, Set>(); - this.inputs = new HashMap, Source>(); - } - - public Map, Set> getNodePaths() { - return nodePaths; - } - - public void visitOutput(PCollectionImpl output) { - nodePaths.put(output, Sets. newHashSet()); - workingNode = output; - workingPath = new NodePath(); - output.accept(this); - } - - @Override - public void visitInputCollection(InputCollection collection) { - workingPath.close(collection); - inputs.put(collection, collection.getSource()); - nodePaths.get(workingNode).add(workingPath); - } - - @Override - public void visitUnionCollection(UnionCollection collection) { - PCollectionImpl baseNode = workingNode; - NodePath basePath = workingPath; - for (PCollectionImpl parent : collection.getParents()) { - workingPath = new NodePath(basePath); - workingNode = baseNode; - processParent(parent); - } - } - - @Override - public void visitDoFnCollection(DoCollectionImpl collection) { - workingPath.push(collection); - processParent(collection.getOnlyParent()); - } - - @Override - public void visitDoTable(DoTableImpl collection) { - workingPath.push(collection); - processParent(collection.getOnlyParent()); - } - - @Override - public void visitGroupedTable(PGroupedTableImpl collection) { - workingPath.close(collection); - nodePaths.get(workingNode).add(workingPath); - workingNode = collection; - nodePaths.put(workingNode, Sets. newHashSet()); - workingPath = new NodePath(collection); - processParent(collection.getOnlyParent()); - } - - private void processParent(PCollectionImpl parent) { - if (!nodePaths.containsKey(parent)) { - parent.accept(this); - } else { - workingPath.close(parent); - nodePaths.get(workingNode).add(workingPath); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java deleted file mode 100644 index 4ab30ca..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.plan; - -import java.util.Iterator; -import java.util.LinkedList; - -import org.apache.crunch.impl.mr.collect.PCollectionImpl; -import com.google.common.collect.Lists; - -class NodePath implements Iterable> { - private LinkedList> path; - - public NodePath() { - this.path = Lists.newLinkedList(); - } - - public NodePath(PCollectionImpl tail) { - this.path = Lists.newLinkedList(); - this.path.add(tail); - } - - public NodePath(NodePath other) { - this.path = Lists.newLinkedList(other.path); - } - - public void push(PCollectionImpl stage) { - this.path.push((PCollectionImpl) stage); - } - - public void close(PCollectionImpl head) { - this.path.push(head); - } - - public Iterator> iterator() { - return path.iterator(); - } - - public Iterator> descendingIterator() { - return path.descendingIterator(); - } - - public PCollectionImpl get(int index) { - return path.get(index); - } - - public PCollectionImpl head() { - return path.peekFirst(); - } - - public PCollectionImpl tail() { - return path.peekLast(); - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof NodePath)) { - return false; - } - NodePath nodePath = (NodePath) other; - return path.equals(nodePath.path); - } - - @Override - public int hashCode() { - return 17 + 37 * path.hashCode(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - for (PCollectionImpl collect : path) { - sb.append(collect.getName() + "|"); - } - sb.deleteCharAt(sb.length() - 1); - return sb.toString(); - } - - public NodePath splitAt(int splitIndex, PCollectionImpl newHead) { - NodePath top = new NodePath(); - for (int i = 0; i <= splitIndex; i++) { - top.path.add(path.get(i)); - } - LinkedList> nextPath = Lists.newLinkedList(); - nextPath.add(newHead); - nextPath.addAll(path.subList(splitIndex + 1, path.size())); - path = nextPath; - return top; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java deleted file mode 100644 index b8e59a3..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.plan; - -public class PlanningParameters { - - public static final String MULTI_OUTPUT_PREFIX = "out"; - - public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir"; - - private PlanningParameters() {} -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java deleted file mode 100644 index fcfbc36..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -public class CrunchCombiner extends CrunchReducer { - - @Override - protected NodeContext getNodeContext() { - return NodeContext.COMBINE; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java deleted file mode 100644 index 6b49735..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.util.ReflectionUtils; - -import org.apache.crunch.io.impl.InputBundle; -import com.google.common.collect.Lists; - -public class CrunchInputFormat extends InputFormat { - - @Override - public List getSplits(JobContext job) throws IOException, - InterruptedException { - List splits = Lists.newArrayList(); - Configuration conf = job.getConfiguration(); - Map>> formatNodeMap = CrunchInputs.getFormatNodeMap(job); - - // First, build a map of InputFormats to Paths - for (Map.Entry>> entry : formatNodeMap.entrySet()) { - InputBundle inputBundle = entry.getKey(); - Job jobCopy = new Job(conf); - InputFormat format = (InputFormat) ReflectionUtils.newInstance( - inputBundle.getInputFormatClass(), jobCopy.getConfiguration()); - for (Map.Entry> nodeEntry : entry.getValue() - .entrySet()) { - Integer nodeIndex = nodeEntry.getKey(); - List paths = nodeEntry.getValue(); - FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths.size()])); - - // Get splits for each input path and tag with InputFormat - // and Mapper types by wrapping in a TaggedInputSplit. - List pathSplits = format.getSplits(jobCopy); - for (InputSplit pathSplit : pathSplits) { - splits.add(new CrunchInputSplit(pathSplit, inputBundle.getInputFormatClass(), - inputBundle.getExtraConfiguration(), nodeIndex, jobCopy.getConfiguration())); - } - } - } - return splits; - } - - @Override - public RecordReader createRecordReader(InputSplit inputSplit, - TaskAttemptContext context) throws IOException, InterruptedException { - return new CrunchRecordReader(inputSplit, context); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java deleted file mode 100644 index b57ca58..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Map; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.io.serializer.Serializer; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.util.ReflectionUtils; - -public class CrunchInputSplit extends InputSplit implements Configurable, Writable { - - private InputSplit inputSplit; - private Class inputFormatClass; - private Map extraConf; - private int nodeIndex; - private Configuration conf; - - public CrunchInputSplit() { - // default constructor - } - - public CrunchInputSplit(InputSplit inputSplit, - Class inputFormatClass, Map extraConf, - int nodeIndex, Configuration conf) { - this.inputSplit = inputSplit; - this.inputFormatClass = inputFormatClass; - this.extraConf = extraConf; - this.nodeIndex = nodeIndex; - this.conf = conf; - } - - public int getNodeIndex() { - return nodeIndex; - } - - public InputSplit getInputSplit() { - return inputSplit; - } - - public Class getInputFormatClass() { - return inputFormatClass; - } - - @Override - public long getLength() throws IOException, InterruptedException { - return inputSplit.getLength(); - } - - @Override - public String[] getLocations() throws IOException, InterruptedException { - return inputSplit.getLocations(); - } - - public void readFields(DataInput in) throws IOException { - nodeIndex = in.readInt(); - int extraConfSize = in.readInt(); - if (extraConfSize > 0) { - for (int i = 0; i < extraConfSize; i++) { - conf.set(in.readUTF(), in.readUTF()); - } - } - inputFormatClass = (Class>) readClass(in); - Class inputSplitClass = (Class) readClass(in); - inputSplit = (InputSplit) ReflectionUtils - .newInstance(inputSplitClass, conf); - SerializationFactory factory = new SerializationFactory(conf); - Deserializer deserializer = factory.getDeserializer(inputSplitClass); - deserializer.open((DataInputStream) in); - inputSplit = (InputSplit) deserializer.deserialize(inputSplit); - } - - private Class readClass(DataInput in) throws IOException { - String className = Text.readString(in); - try { - return conf.getClassByName(className); - } catch (ClassNotFoundException e) { - throw new RuntimeException("readObject can't find class", e); - } - } - - public void write(DataOutput out) throws IOException { - out.writeInt(nodeIndex); - out.writeInt(extraConf.size()); - for (Map.Entry e : extraConf.entrySet()) { - out.writeUTF(e.getKey()); - out.writeUTF(e.getValue()); - } - Text.writeString(out, inputFormatClass.getName()); - Text.writeString(out, inputSplit.getClass().getName()); - SerializationFactory factory = new SerializationFactory(conf); - Serializer serializer = factory.getSerializer(inputSplit.getClass()); - serializer.open((DataOutputStream) out); - serializer.serialize(inputSplit); - } - - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java deleted file mode 100644 index 8fa1d56..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; - -import org.apache.crunch.io.impl.InputBundle; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -public class CrunchInputs { - - private static final char RECORD_SEP = ','; - private static final char FIELD_SEP = ';'; - private static final Joiner JOINER = Joiner.on(FIELD_SEP); - private static final Splitter SPLITTER = Splitter.on(FIELD_SEP); - - public static void addInputPath(Job job, Path path, - InputBundle inputBundle, int nodeIndex) { - Configuration conf = job.getConfiguration(); - String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString()); - String existing = conf.get(RuntimeParameters.MULTI_INPUTS); - conf.set(RuntimeParameters.MULTI_INPUTS, existing == null ? inputs : existing + RECORD_SEP - + inputs); - } - - public static Map>> getFormatNodeMap( - JobContext job) { - Map>> formatNodeMap = Maps.newHashMap(); - Configuration conf = job.getConfiguration(); - for (String input : Splitter.on(RECORD_SEP).split(conf.get(RuntimeParameters.MULTI_INPUTS))) { - List fields = Lists.newArrayList(SPLITTER.split(input)); - InputBundle inputBundle = InputBundle.fromSerialized(fields.get(0)); - if (!formatNodeMap.containsKey(inputBundle)) { - formatNodeMap.put(inputBundle, Maps.> newHashMap()); - } - Integer nodeIndex = Integer.valueOf(fields.get(1)); - if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) { - formatNodeMap.get(inputBundle).put(nodeIndex, - Lists. newLinkedList()); - } - formatNodeMap.get(inputBundle).get(nodeIndex) - .add(new Path(fields.get(2))); - } - return formatNodeMap; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java deleted file mode 100644 index 814c8c3..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapreduce.Mapper; - -public class CrunchMapper extends Mapper { - - private static final Log LOG = LogFactory.getLog(CrunchMapper.class); - - private RTNode node; - private CrunchTaskContext ctxt; - private boolean debug; - - @Override - protected void setup(Mapper.Context context) { - List nodes; - this.ctxt = new CrunchTaskContext(context, NodeContext.MAP); - try { - nodes = ctxt.getNodes(); - } catch (IOException e) { - LOG.info("Crunch deserialization error", e); - throw new CrunchRuntimeException(e); - } - if (nodes.size() == 1) { - this.node = nodes.get(0); - } else { - CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit(); - this.node = nodes.get(split.getNodeIndex()); - } - this.debug = ctxt.isDebugRun(); - } - - @Override - protected void map(Object k, Object v, - Mapper.Context context) { - if (debug) { - try { - node.process(k, v); - } catch (Exception e) { - LOG.error("Mapper exception", e); - } - } else { - node.process(k, v); - } - } - - @Override - protected void cleanup(Mapper.Context context) { - node.cleanup(); - ctxt.cleanup(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java deleted file mode 100644 index 5967aa9..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import java.io.IOException; - -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.util.ReflectionUtils; - -class CrunchRecordReader extends RecordReader { - - private final RecordReader delegate; - - public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) - throws IOException, InterruptedException { - CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit; - InputFormat inputFormat = (InputFormat) ReflectionUtils - .newInstance(crunchSplit.getInputFormatClass(), crunchSplit.getConf()); - this.delegate = inputFormat.createRecordReader( - crunchSplit.getInputSplit(), TaskAttemptContextFactory.create( - crunchSplit.getConf(), context.getTaskAttemptID())); - } - - @Override - public void close() throws IOException { - delegate.close(); - } - - @Override - public K getCurrentKey() throws IOException, InterruptedException { - return delegate.getCurrentKey(); - } - - @Override - public V getCurrentValue() throws IOException, InterruptedException { - return delegate.getCurrentValue(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return delegate.getProgress(); - } - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext context) - throws IOException, InterruptedException { - CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit; - InputSplit delegateSplit = crunchSplit.getInputSplit(); - delegate.initialize(delegateSplit, TaskAttemptContextFactory.create( - crunchSplit.getConf(), context.getTaskAttemptID())); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return delegate.nextKeyValue(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java deleted file mode 100644 index aa5fc95..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapreduce.Reducer; - -public class CrunchReducer extends Reducer { - - private static final Log LOG = LogFactory.getLog(CrunchReducer.class); - - private RTNode node; - private CrunchTaskContext ctxt; - private boolean debug; - - protected NodeContext getNodeContext() { - return NodeContext.REDUCE; - } - - @Override - protected void setup(Reducer.Context context) { - this.ctxt = new CrunchTaskContext(context, getNodeContext()); - try { - List nodes = ctxt.getNodes(); - this.node = nodes.get(0); - } catch (IOException e) { - LOG.info("Crunch deserialization error", e); - throw new CrunchRuntimeException(e); - } - this.debug = ctxt.isDebugRun(); - } - - @Override - protected void reduce(Object key, Iterable values, - Reducer.Context context) { - if (debug) { - try { - node.processIterable(key, values); - } catch (Exception e) { - LOG.error("Reducer exception", e); - } - } else { - node.processIterable(key, values); - } - } - - @Override - protected void cleanup(Reducer.Context context) { - node.cleanup(); - ctxt.cleanup(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java deleted file mode 100644 index 9eac51c..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -public class CrunchRuntimeException extends RuntimeException { - - private boolean logged = false; - - public CrunchRuntimeException(String msg) { - super(msg); - } - - public CrunchRuntimeException(Exception e) { - super(e); - } - - public CrunchRuntimeException(String msg, Exception e) { - super(msg, e); - } - - public boolean wasLogged() { - return logged; - } - - public void markLogged() { - this.logged = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java deleted file mode 100644 index dd04813..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.TaskInputOutputContext; -import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs; - -import org.apache.crunch.impl.mr.plan.PlanningParameters; -import org.apache.crunch.util.DistCache; - -public class CrunchTaskContext { - - private final TaskInputOutputContext taskContext; - private final NodeContext nodeContext; - private CrunchMultipleOutputs multipleOutputs; - - public CrunchTaskContext( - TaskInputOutputContext taskContext, - NodeContext nodeContext) { - this.taskContext = taskContext; - this.nodeContext = nodeContext; - } - - public TaskInputOutputContext getContext() { - return taskContext; - } - - public NodeContext getNodeContext() { - return nodeContext; - } - - public List getNodes() throws IOException { - Configuration conf = taskContext.getConfiguration(); - Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString()); - @SuppressWarnings("unchecked") - List nodes = (List) DistCache.read(conf, path); - if (nodes != null) { - for (RTNode node : nodes) { - node.initialize(this); - } - } - return nodes; - } - - public boolean isDebugRun() { - Configuration conf = taskContext.getConfiguration(); - return conf.getBoolean(RuntimeParameters.DEBUG, false); - } - - public void cleanup() { - if (multipleOutputs != null) { - try { - multipleOutputs.close(); - } catch (IOException e) { - throw new CrunchRuntimeException(e); - } catch (InterruptedException e) { - throw new CrunchRuntimeException(e); - } - } - } - - public CrunchMultipleOutputs getMultipleOutputs() { - if (multipleOutputs == null) { - multipleOutputs = new CrunchMultipleOutputs(taskContext); - } - return multipleOutputs; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java b/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java deleted file mode 100644 index 648e87c..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import org.apache.crunch.impl.mr.plan.DoNode; - -/** - * Enum that is associated with a serialized {@link DoNode} instance, so we know - * how to use it within the context of a particular MR job. - * - */ -public enum NodeContext { - MAP, REDUCE, COMBINE; - - public String getConfigurationKey() { - return "crunch.donode." + toString().toLowerCase(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java deleted file mode 100644 index 4debfc1..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import java.io.Serializable; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; -import org.apache.crunch.impl.mr.emit.IntermediateEmitter; -import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter; -import org.apache.crunch.impl.mr.emit.OutputEmitter; -import org.apache.crunch.types.Converter; - -public class RTNode implements Serializable { - - private static final Log LOG = LogFactory.getLog(RTNode.class); - - private final String nodeName; - private DoFn fn; - private final List children; - private final Converter inputConverter; - private final Converter outputConverter; - private final String outputName; - - private transient Emitter emitter; - - public RTNode(DoFn fn, String name, List children, - Converter inputConverter, Converter outputConverter, String outputName) { - this.fn = fn; - this.nodeName = name; - this.children = children; - this.inputConverter = inputConverter; - this.outputConverter = outputConverter; - this.outputName = outputName; - } - - public void initialize(CrunchTaskContext ctxt) { - if (emitter != null) { - // Already initialized - return; - } - - fn.setContext(ctxt.getContext()); - for (RTNode child : children) { - child.initialize(ctxt); - } - - if (outputConverter != null) { - if (outputName != null) { - this.emitter = new MultipleOutputEmitter( - outputConverter, ctxt.getMultipleOutputs(), outputName); - } else { - this.emitter = new OutputEmitter( - outputConverter, ctxt.getContext()); - } - } else if (!children.isEmpty()) { - this.emitter = new IntermediateEmitter(children); - } else { - throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName); - } - } - - public boolean isLeafNode() { - return outputConverter != null && children.isEmpty(); - } - - public void process(Object input) { - try { - fn.process(input, emitter); - } catch (CrunchRuntimeException e) { - if (!e.wasLogged()) { - LOG.info(String.format("Crunch exception in '%s' for input: %s", - nodeName, input.toString()), e); - e.markLogged(); - } - throw e; - } - } - - public void process(Object key, Object value) { - process(inputConverter.convertInput(key, value)); - } - - public void processIterable(Object key, Iterable values) { - process(inputConverter.convertIterableInput(key, values)); - } - - public void cleanup() { - fn.cleanup(emitter); - emitter.flush(); - for (RTNode child : children) { - child.cleanup(); - } - } - - @Override - public String toString() { - return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children=" - + children + ", inputConverter=" + inputConverter - + ", outputConverter=" + outputConverter + ", outputName=" + outputName - + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java deleted file mode 100644 index 5e534eb..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -/** - * Parameters used during the runtime execution. - * - */ -public class RuntimeParameters { - - public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets"; - - public static final String MULTI_INPUTS = "crunch.inputs.dir"; - - public static final String DEBUG = "crunch.debug"; - - // Not instantiated - private RuntimeParameters() {} -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java b/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java deleted file mode 100644 index 2cfa615..0000000 --- a/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import java.lang.reflect.Constructor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; - -/** - * - */ -@SuppressWarnings("unchecked") -public class TaskAttemptContextFactory { - - private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class); - - private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory(); - - public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) { - return INSTANCE.createInternal(conf, taskAttemptId); - } - - private Constructor taskAttemptConstructor; - - private TaskAttemptContextFactory() { - Class implClass = TaskAttemptContext.class; - if (implClass.isInterface()) { - try { - implClass = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); - } catch (ClassNotFoundException e) { - LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e); - } - } - try { - this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class); - } catch (Exception e) { - LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e); - } - } - - private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) { - try { - return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId); - } catch (Exception e) { - LOG.error("Could not construct a TaskAttemptContext instance", e); - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/io/At.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/io/At.java b/src/main/java/org/apache/crunch/io/At.java deleted file mode 100644 index 2f5fe8b..0000000 --- a/src/main/java/org/apache/crunch/io/At.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.io; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.Scan; - -import org.apache.crunch.SourceTarget; -import org.apache.crunch.io.avro.AvroFileSourceTarget; -import org.apache.crunch.io.hbase.HBaseSourceTarget; -import org.apache.crunch.io.seq.SeqFileSourceTarget; -import org.apache.crunch.io.seq.SeqFileTableSourceTarget; -import org.apache.crunch.io.text.TextFileSourceTarget; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.PTypeFamily; -import org.apache.crunch.types.avro.AvroType; -import org.apache.crunch.types.writable.Writables; - -/** - * Static factory methods for creating various {@link SourceTarget} types. - * - */ -public class At { - public static AvroFileSourceTarget avroFile(String pathName, AvroType avroType) { - return avroFile(new Path(pathName), avroType); - } - - public static AvroFileSourceTarget avroFile(Path path, AvroType avroType) { - return new AvroFileSourceTarget(path, avroType); - } - - public static HBaseSourceTarget hbaseTable(String table) { - return hbaseTable(table, new Scan()); - } - - public static HBaseSourceTarget hbaseTable(String table, Scan scan) { - return new HBaseSourceTarget(table, scan); - } - - public static SeqFileSourceTarget sequenceFile(String pathName, PType ptype) { - return sequenceFile(new Path(pathName), ptype); - } - - public static SeqFileSourceTarget sequenceFile(Path path, PType ptype) { - return new SeqFileSourceTarget(path, ptype); - } - - public static SeqFileTableSourceTarget sequenceFile(String pathName, PType keyType, - PType valueType) { - return sequenceFile(new Path(pathName), keyType, valueType); - } - - public static SeqFileTableSourceTarget sequenceFile(Path path, PType keyType, - PType valueType) { - PTypeFamily ptf = keyType.getFamily(); - return new SeqFileTableSourceTarget(path, ptf.tableOf(keyType, valueType)); - } - - public static TextFileSourceTarget textFile(String pathName) { - return textFile(new Path(pathName)); - } - - public static TextFileSourceTarget textFile(Path path) { - return textFile(path, Writables.strings()); - } - - public static TextFileSourceTarget textFile(String pathName, PType ptype) { - return textFile(new Path(pathName), ptype); - } - - public static TextFileSourceTarget textFile(Path path, PType ptype) { - return new TextFileSourceTarget(path, ptype); - } -}