Return-Path: X-Original-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D060BD559 for ; Sat, 7 Jul 2012 21:49:07 +0000 (UTC) Received: (qmail 33535 invoked by uid 500); 7 Jul 2012 21:49:07 -0000 Delivered-To: apmail-incubator-crunch-commits-archive@incubator.apache.org Received: (qmail 33439 invoked by uid 500); 7 Jul 2012 21:49:07 -0000 Mailing-List: contact crunch-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-commits@incubator.apache.org Received: (qmail 33355 invoked by uid 99); 7 Jul 2012 21:49:07 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 07 Jul 2012 21:49:07 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E92EAC8C0; Sat, 7 Jul 2012 21:49:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: crunch-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [10/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core Message-Id: <20120707214906.E92EAC8C0@tyr.zones.apache.org> Date: Sat, 7 Jul 2012 21:49:06 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/fn/MapValuesFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/src/main/java/org/apache/crunch/fn/MapValuesFn.java new file mode 100644 index 0000000..3c7065e --- /dev/null +++ b/src/main/java/org/apache/crunch/fn/MapValuesFn.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.Pair; + +public abstract class MapValuesFn extends DoFn, Pair> { + + @Override + public void process(Pair input, Emitter> emitter) { + emitter.emit(Pair.of(input.first(), map(input.second()))); + } + + public abstract V2 map(V1 v); +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/fn/PairMapFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/fn/PairMapFn.java b/src/main/java/org/apache/crunch/fn/PairMapFn.java new file mode 100644 index 0000000..2cfd17b --- /dev/null +++ b/src/main/java/org/apache/crunch/fn/PairMapFn.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.fn; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; + +public class PairMapFn extends MapFn, Pair> { + + private MapFn keys; + private MapFn values; + + public PairMapFn(MapFn keys, MapFn values) { + this.keys = keys; + this.values = values; + } + + @Override + public void configure(Configuration conf) { + keys.configure(conf); + values.configure(conf); + } + + @Override + public void initialize() { + keys.setContext(getContext()); + values.setContext(getContext()); + } + + @Override + public Pair map(Pair input) { + return Pair.of(keys.map(input.first()), values.map(input.second())); + } + + @Override + public void cleanup(Emitter> emitter) { + keys.cleanup(null); + values.cleanup(null); + } + + @Override + public void setConfigurationForTest(Configuration conf) { + keys.setConfigurationForTest(conf); + values.setConfigurationForTest(conf); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java new file mode 100644 index 0000000..6305fcb --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.Source; +import org.apache.crunch.TableSource; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mem.collect.MemCollection; +import org.apache.crunch.impl.mem.collect.MemTable; +import org.apache.crunch.io.At; +import org.apache.crunch.io.PathTarget; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class MemPipeline implements Pipeline { + + private static final Log LOG = LogFactory.getLog(MemPipeline.class); + + private static final MemPipeline INSTANCE = new MemPipeline(); + + public static Pipeline getInstance() { + return INSTANCE; + } + + public static PCollection collectionOf(T...ts) { + return new MemCollection(ImmutableList.copyOf(ts)); + } + + public static PCollection collectionOf(Iterable collect) { + return new MemCollection(collect); + } + + public static PCollection typedCollectionOf(PType ptype, T... ts) { + return new MemCollection(ImmutableList.copyOf(ts), ptype, null); + } + + public static PCollection typedCollectionOf(PType ptype, Iterable collect) { + return new MemCollection(collect, ptype, null); + } + + public static PTable tableOf(S s, T t, Object... more) { + List> pairs = Lists.newArrayList(); + pairs.add(Pair.of(s, t)); + for (int i = 0; i < more.length; i += 2) { + pairs.add(Pair.of((S) more[i], (T) more[i + 1])); + } + return new MemTable(pairs); + } + + public static PTable typedTableOf(PTableType ptype, S s, T t, Object... more) { + List> pairs = Lists.newArrayList(); + pairs.add(Pair.of(s, t)); + for (int i = 0; i < more.length; i += 2) { + pairs.add(Pair.of((S) more[i], (T) more[i + 1])); + } + return new MemTable(pairs, ptype, null); + } + + public static PTable tableOf(Iterable> pairs) { + return new MemTable(pairs); + } + + public static PTable typedTableOf(PTableType ptype, Iterable> pairs) { + return new MemTable(pairs, ptype, null); + } + + private Configuration conf = new Configuration(); + + private MemPipeline() { + } + + @Override + public void setConfiguration(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public PCollection read(Source source) { + if (source instanceof ReadableSource) { + try { + Iterable iterable = ((ReadableSource) source).read(conf); + return new MemCollection(iterable, source.getType(), source.toString()); + } catch (IOException e) { + LOG.error("Exception reading source: " + source.toString(), e); + throw new IllegalStateException(e); + } + } + LOG.error("Source " + source + " is not readable"); + throw new IllegalStateException("Source " + source + " is not readable"); + } + + @Override + public PTable read(TableSource source) { + if (source instanceof ReadableSource) { + try { + Iterable> iterable = ((ReadableSource>) source).read(conf); + return new MemTable(iterable, source.getTableType(), source.toString()); + } catch (IOException e) { + LOG.error("Exception reading source: " + source.toString(), e); + throw new IllegalStateException(e); + } + } + LOG.error("Source " + source + " is not readable"); + throw new IllegalStateException("Source " + source + " is not readable"); + } + + @Override + public void write(PCollection collection, Target target) { + if (target instanceof PathTarget) { + Path path = ((PathTarget) target).getPath(); + try { + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream os = fs.create(new Path(path, "out")); + if (collection instanceof PTable) { + for (Object o : collection.materialize()) { + Pair p = (Pair) o; + os.writeBytes(p.first().toString()); + os.writeBytes("\t"); + os.writeBytes(p.second().toString()); + os.writeBytes("\r\n"); + } + } else { + for (Object o : collection.materialize()) { + os.writeBytes(o.toString() + "\r\n"); + } + } + os.close(); + } catch (IOException e) { + LOG.error("Exception writing target: " + target, e); + } + } else { + LOG.error("Target " + target + " is not a PathTarget instance"); + } + } + + @Override + public PCollection readTextFile(String pathName) { + return read(At.textFile(pathName)); + } + + @Override + public void writeTextFile(PCollection collection, String pathName) { + write(collection, At.textFile(pathName)); + } + + @Override + public Iterable materialize(PCollection pcollection) { + return pcollection.materialize(); + } + + @Override + public PipelineResult run() { + return PipelineResult.EMPTY; + } + + @Override + public PipelineResult done() { + return PipelineResult.EMPTY; + } + + @Override + public void enableDebug() { + LOG.info("Note: in-memory pipelines do not have debug logging"); + } + + @Override + public String getName() { + return "Memory Pipeline"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java new file mode 100644 index 0000000..7291e50 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem.collect; + +import java.util.Collection; + +import org.apache.crunch.DoFn; +import org.apache.crunch.FilterFn; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; +import org.apache.crunch.fn.ExtractKeyFn; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.lib.Sample; +import org.apache.crunch.lib.Sort; +import org.apache.crunch.test.InMemoryEmitter; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + + +public class MemCollection implements PCollection { + + private final Collection collect; + private final PType ptype; + private String name; + + public MemCollection(Iterable collect) { + this(collect, null, null); + } + + public MemCollection(Iterable collect, PType ptype) { + this(collect, ptype, null); + } + + public MemCollection(Iterable collect, PType ptype, String name) { + this.collect = ImmutableList.copyOf(collect); + this.ptype = ptype; + this.name = name; + } + + @Override + public Pipeline getPipeline() { + return MemPipeline.getInstance(); + } + + @Override + public PCollection union(PCollection... collections) { + Collection output = Lists.newArrayList(); + for (PCollection pcollect : collections) { + for (S s : pcollect.materialize()) { + output.add(s); + } + } + output.addAll(collect); + return new MemCollection(output, collections[0].getPType()); + } + + @Override + public PCollection parallelDo(DoFn doFn, PType type) { + return parallelDo(null, doFn, type); + } + + @Override + public PCollection parallelDo(String name, DoFn doFn, PType type) { + InMemoryEmitter emitter = new InMemoryEmitter(); + doFn.initialize(); + for (S s : collect) { + doFn.process(s, emitter); + } + doFn.cleanup(emitter); + return new MemCollection(emitter.getOutput(), type, name); + } + + @Override + public PTable parallelDo(DoFn> doFn, PTableType type) { + return parallelDo(null, doFn, type); + } + + @Override + public PTable parallelDo(String name, DoFn> doFn, + PTableType type) { + InMemoryEmitter> emitter = new InMemoryEmitter>(); + doFn.initialize(); + for (S s : collect) { + doFn.process(s, emitter); + } + doFn.cleanup(emitter); + return new MemTable(emitter.getOutput(), type, name); + } + + @Override + public PCollection write(Target target) { + getPipeline().write(this, target); + return this; + } + + @Override + public Iterable materialize() { + return collect; + } + + public Collection getCollection() { + return collect; + } + + @Override + public PType getPType() { + return ptype; + } + + @Override + public PTypeFamily getTypeFamily() { + if (ptype != null) { + return ptype.getFamily(); + } + return null; + } + + @Override + public long getSize() { + return collect.size(); + } + + @Override + public String getName() { + return name; + } + + @Override + public String toString() { + return collect.toString(); + } + + @Override + public PTable count() { + return Aggregate.count(this); + } + + @Override + public PCollection sample(double acceptanceProbability) { + return Sample.sample(this, acceptanceProbability); + } + + @Override + public PCollection sample(double acceptanceProbability, long seed) { + return Sample.sample(this, seed, acceptanceProbability); + } + + @Override + public PCollection max() { + return Aggregate.max(this); + } + + @Override + public PCollection min() { + return Aggregate.min(this); + } + + @Override + public PCollection sort(boolean ascending) { + return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING); + } + + @Override + public PCollection filter(FilterFn filterFn) { + return parallelDo(filterFn, getPType()); + } + + @Override + public PCollection filter(String name, FilterFn filterFn) { + return parallelDo(name, filterFn, getPType()); + } + + @Override + public PTable by(MapFn mapFn, PType keyType) { + return parallelDo(new ExtractKeyFn(mapFn), getTypeFamily().tableOf(keyType, getPType())); + } + + @Override + public PTable by(String name, MapFn mapFn, PType keyType) { + return parallelDo(name, new ExtractKeyFn(mapFn), getTypeFamily().tableOf(keyType, getPType())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java new file mode 100644 index 0000000..4e114ab --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem.collect; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.util.ReflectionUtils; + +import org.apache.crunch.CombineFn; +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +class MemGroupedTable extends MemCollection>> implements PGroupedTable { + + private final MemTable parent; + + private static Map> createMapFor(PType keyType, GroupingOptions options, Pipeline pipeline) { + if (options != null && options.getSortComparatorClass() != null) { + RawComparator rc = ReflectionUtils.newInstance(options.getSortComparatorClass(), + pipeline.getConfiguration()); + return new TreeMap>(rc); + } else if (keyType != null && Comparable.class.isAssignableFrom(keyType.getTypeClass())) { + return new TreeMap>(); + } + return Maps.newHashMap(); + } + + private static Iterable>> buildMap(MemTable parent, GroupingOptions options) { + PType keyType = parent.getKeyType(); + Map> map = createMapFor(keyType, options, parent.getPipeline()); + + for (Pair pair : parent.materialize()) { + S key = pair.first(); + if (!map.containsKey(key)) { + map.put(key, Lists.newArrayList()); + } + map.get(key).add(pair.second()); + } + + List>> values = Lists.newArrayList(); + for (Map.Entry> e : map.entrySet()) { + values.add(Pair.of(e.getKey(), (Iterable) e.getValue())); + } + return values; + } + + public MemGroupedTable(MemTable parent, GroupingOptions options) { + super(buildMap(parent, options)); + this.parent = parent; + } + + @Override + public PCollection>> union( + PCollection>>... collections) { + throw new UnsupportedOperationException(); + } + + @Override + public PCollection>> write(Target target) { + getPipeline().write(this.ungroup(), target); + return this; + } + + @Override + public PType>> getPType() { + PTableType parentType = parent.getPTableType(); + if (parentType != null) { + return parentType.getGroupedTableType(); + } + return null; + } + + @Override + public PTypeFamily getTypeFamily() { + return parent.getTypeFamily(); + } + + @Override + public long getSize() { + return parent.getSize(); + } + + @Override + public String getName() { + return "MemGrouped(" + parent.getName() + ")"; + } + + @Override + public PTable combineValues(CombineFn combineFn) { + return parallelDo(combineFn, parent.getPTableType()); + } + + @Override + public PTable ungroup() { + return parent; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java new file mode 100644 index 0000000..53e7526 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem.collect; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Target; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.lib.Cogroup; +import org.apache.crunch.lib.Join; +import org.apache.crunch.lib.PTables; +import org.apache.crunch.materialize.MaterializableMap; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import com.google.common.collect.Lists; + +public class MemTable extends MemCollection> implements PTable { + + private PTableType ptype; + + public MemTable(Iterable> collect) { + this(collect, null, null); + } + + public MemTable(Iterable> collect, PTableType ptype, String name) { + super(collect, ptype, name); + this.ptype = ptype; + } + + @Override + public PTable union(PTable... others) { + List> values = Lists.newArrayList(); + values.addAll(getCollection()); + for (PTable ptable : others) { + for (Pair p : ptable.materialize()) { + values.add(p); + } + } + return new MemTable(values, others[0].getPTableType(), null); + } + + @Override + public PGroupedTable groupByKey() { + return groupByKey(null); + } + + @Override + public PGroupedTable groupByKey(int numPartitions) { + return groupByKey(null); + } + + @Override + public PGroupedTable groupByKey(GroupingOptions options) { + return new MemGroupedTable(this, options); + } + + @Override + public PTable write(Target target) { + super.write(target); + return this; + } + + @Override + public PTableType getPTableType() { + return ptype; + } + + @Override + public PType getKeyType() { + if (ptype != null) { + return ptype.getKeyType(); + } + return null; + } + + @Override + public PType getValueType() { + if (ptype != null) { + return ptype.getValueType(); + } + return null; + } + + @Override + public PTable top(int count) { + return Aggregate.top(this, count, true); + } + + @Override + public PTable bottom(int count) { + return Aggregate.top(this, count, false); + } + + @Override + public PTable> collectValues() { + return Aggregate.collectValues(this); + } + + @Override + public PTable> join(PTable other) { + return Join.join(this, other); + } + + @Override + public PTable, Collection>> cogroup(PTable other) { + return Cogroup.cogroup(this, other); + } + + @Override + public PCollection keys() { + return PTables.keys(this); + } + + @Override + public PCollection values() { + return PTables.values(this); + } + + @Override + public Map materializeToMap() { + return new MaterializableMap(this.materialize()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java new file mode 100644 index 0000000..0b7d8d7 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr; + +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.Source; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.TableSource; +import org.apache.crunch.Target; +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.impl.mr.collect.InputCollection; +import org.apache.crunch.impl.mr.collect.InputTable; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; +import org.apache.crunch.impl.mr.collect.UnionCollection; +import org.apache.crunch.impl.mr.collect.UnionTable; +import org.apache.crunch.impl.mr.plan.MSCRPlanner; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.io.At; +import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.crunch.materialize.MaterializableIterable; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.WritableTypeFamily; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class MRPipeline implements Pipeline { + + private static final Log LOG = LogFactory.getLog(MRPipeline.class); + + private static final Random RANDOM = new Random(); + + private final Class jarClass; + private final String name; + private final Map, Set> outputTargets; + private final Map, MaterializableIterable> outputTargetsToMaterialize; + private final Path tempDirectory; + private int tempFileIndex; + private int nextAnonymousStageId; + + private Configuration conf; + + public MRPipeline(Class jarClass) throws IOException { + this(jarClass, new Configuration()); + } + + public MRPipeline(Class jarClass, String name) { + this(jarClass, name, new Configuration()); + } + + public MRPipeline(Class jarClass, Configuration conf) { + this(jarClass, jarClass.getName(), conf); + } + + public MRPipeline(Class jarClass, String name, Configuration conf) { + this.jarClass = jarClass; + this.name = name; + this.outputTargets = Maps.newHashMap(); + this.outputTargetsToMaterialize = Maps.newHashMap(); + this.conf = conf; + this.tempDirectory = createTempDirectory(conf); + this.tempFileIndex = 0; + this.nextAnonymousStageId = 0; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public void setConfiguration(Configuration conf) { + this.conf = conf; + } + + @Override + public PipelineResult run() { + MSCRPlanner planner = new MSCRPlanner(this, outputTargets); + PipelineResult res = null; + try { + res = planner.plan(jarClass, conf).execute(); + } catch (IOException e) { + LOG.error(e); + return PipelineResult.EMPTY; + } + for (PCollectionImpl c : outputTargets.keySet()) { + if (outputTargetsToMaterialize.containsKey(c)) { + MaterializableIterable iter = outputTargetsToMaterialize.get(c); + iter.materialize(); + c.materializeAt(iter.getSourceTarget()); + outputTargetsToMaterialize.remove(c); + } else { + boolean materialized = false; + for (Target t : outputTargets.get(c)) { + if (!materialized && t instanceof Source) { + c.materializeAt((SourceTarget) t); + materialized = true; + } + } + } + } + outputTargets.clear(); + return res; + } + + @Override + public PipelineResult done() { + PipelineResult res = null; + if (!outputTargets.isEmpty()) { + res = run(); + } + cleanup(); + return res; + } + + public PCollection read(Source source) { + return new InputCollection(source, this); + } + + public PTable read(TableSource source) { + return new InputTable(source, this); + } + + public PCollection readTextFile(String pathName) { + return read(At.textFile(pathName)); + } + + @SuppressWarnings("unchecked") + public void write(PCollection pcollection, Target target) { + if (pcollection instanceof PGroupedTableImpl) { + pcollection = ((PGroupedTableImpl) pcollection).ungroup(); + } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) { + pcollection = pcollection.parallelDo("UnionCollectionWrapper", + (MapFn) IdentityFn. getInstance(), pcollection.getPType()); + } + addOutput((PCollectionImpl) pcollection, target); + } + + private void addOutput(PCollectionImpl impl, Target target) { + if (!outputTargets.containsKey(impl)) { + outputTargets.put(impl, Sets. newHashSet()); + } + outputTargets.get(impl).add(target); + } + + @Override + public Iterable materialize(PCollection pcollection) { + + PCollectionImpl pcollectionImpl = toPcollectionImpl(pcollection); + ReadableSourceTarget srcTarget = getMaterializeSourceTarget(pcollectionImpl); + + MaterializableIterable c = new MaterializableIterable(this, srcTarget); + if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) { + outputTargetsToMaterialize.put(pcollectionImpl, c); + } + return c; + } + + /** + * Retrieve a ReadableSourceTarget that provides access to the contents of a + * {@link PCollection}. This is primarily intended as a helper method to + * {@link #materialize(PCollection)}. The underlying data of the + * ReadableSourceTarget may not be actually present until the pipeline is run. + * + * @param pcollection + * The collection for which the ReadableSourceTarget is to be + * retrieved + * @return The ReadableSourceTarget + * @throws IllegalArgumentException + * If no ReadableSourceTarget can be retrieved for the given + * PCollection + */ + public ReadableSourceTarget getMaterializeSourceTarget(PCollection pcollection) { + PCollectionImpl impl = toPcollectionImpl(pcollection); + SourceTarget matTarget = impl.getMaterializedAt(); + if (matTarget != null && matTarget instanceof ReadableSourceTarget) { + return (ReadableSourceTarget) matTarget; + } + + ReadableSourceTarget srcTarget = null; + if (outputTargets.containsKey(pcollection)) { + for (Target target : outputTargets.get(impl)) { + if (target instanceof ReadableSourceTarget) { + srcTarget = (ReadableSourceTarget) target; + break; + } + } + } + + if (srcTarget == null) { + SourceTarget st = createIntermediateOutput(pcollection.getPType()); + if (!(st instanceof ReadableSourceTarget)) { + throw new IllegalArgumentException("The PType for the given PCollection is not readable" + + " and cannot be materialized"); + } else { + srcTarget = (ReadableSourceTarget) st; + addOutput(impl, srcTarget); + } + } + + return srcTarget; + } + + /** + * Safely cast a PCollection into a PCollectionImpl, including handling the case of UnionCollections. + * @param pcollection The PCollection to be cast/transformed + * @return The PCollectionImpl representation + */ + private PCollectionImpl toPcollectionImpl(PCollection pcollection) { + PCollectionImpl pcollectionImpl = null; + if (pcollection instanceof UnionCollection) { + pcollectionImpl = (PCollectionImpl) pcollection.parallelDo("UnionCollectionWrapper", + (MapFn) IdentityFn. getInstance(), pcollection.getPType()); + } else { + pcollectionImpl = (PCollectionImpl) pcollection; + } + return pcollectionImpl; + } + + public SourceTarget createIntermediateOutput(PType ptype) { + return ptype.getDefaultFileSource(createTempPath()); + } + + public Path createTempPath() { + tempFileIndex++; + return new Path(tempDirectory, "p" + tempFileIndex); + } + + private static Path createTempDirectory(Configuration conf) { + Path dir = new Path("/tmp/crunch" + RANDOM.nextInt()); + try { + FileSystem.get(conf).mkdirs(dir); + } catch (IOException e) { + LOG.error("Exception creating job output directory", e); + throw new RuntimeException(e); + } + return dir; + } + + @Override + public void writeTextFile(PCollection pcollection, String pathName) { + // Ensure that this is a writable pcollection instance. + pcollection = pcollection.parallelDo("asText", IdentityFn. getInstance(), WritableTypeFamily + .getInstance().as(pcollection.getPType())); + write(pcollection, At.textFile(pathName)); + } + + private void cleanup() { + if (!outputTargets.isEmpty()) { + LOG.warn("Not running cleanup while output targets remain"); + return; + } + try { + FileSystem fs = FileSystem.get(conf); + if (fs.exists(tempDirectory)) { + fs.delete(tempDirectory, true); + } + } catch (IOException e) { + LOG.info("Exception during cleanup", e); + } + } + + public int getNextAnonymousStageId() { + return nextAnonymousStageId++; + } + + @Override + public void enableDebug() { + // Turn on Crunch runtime error catching. + getConfiguration().setBoolean(RuntimeParameters.DEBUG, true); + + // Write Hadoop's WARN logs to the console. + Logger crunchInfoLogger = LogManager.getLogger("org.apache.crunch"); + Appender console = crunchInfoLogger.getAppender("A"); + if (console != null) { + Logger hadoopLogger = LogManager.getLogger("org.apache.hadoop"); + hadoopLogger.setLevel(Level.WARN); + hadoopLogger.addAppender(console); + } else { + LOG.warn("Could not find console appender named 'A' for writing Hadoop warning logs"); + } + } + + @Override + public String getName() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java new file mode 100644 index 0000000..3c9d522 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; + +import org.apache.crunch.DoFn; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PType; +import com.google.common.collect.ImmutableList; + +public class DoCollectionImpl extends PCollectionImpl { + + private final PCollectionImpl parent; + private final DoFn fn; + private final PType ntype; + + DoCollectionImpl(String name, PCollectionImpl parent, DoFn fn, + PType ntype) { + super(name); + this.parent = (PCollectionImpl) parent; + this.fn = (DoFn) fn; + this.ntype = ntype; + } + + @Override + protected long getSizeInternal() { + return (long) (fn.scaleFactor() * parent.getSize()); + } + + @Override + public PType getPType() { + return ntype; + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitDoFnCollection(this); + } + + @Override + public List> getParents() { + return ImmutableList.> of(parent); + } + + @Override + public DoNode createDoNode() { + return DoNode.createFnNode(getName(), fn, ntype); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java new file mode 100644 index 0000000..3e5a275 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; + +import org.apache.crunch.CombineFn; +import org.apache.crunch.DoFn; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import com.google.common.collect.ImmutableList; + +public class DoTableImpl extends PTableBase implements + PTable { + + private final PCollectionImpl parent; + private final DoFn> fn; + private final PTableType type; + + DoTableImpl(String name, PCollectionImpl parent, + DoFn> fn, PTableType ntype) { + super(name); + this.parent = parent; + this.fn = fn; + this.type = ntype; + } + + @Override + protected long getSizeInternal() { + return (long) (fn.scaleFactor() * parent.getSize()); + } + + @Override + public PTableType getPTableType() { + return type; + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitDoTable(this); + } + + @Override + public PType> getPType() { + return type; + } + + @Override + public List> getParents() { + return ImmutableList.> of(parent); + } + + @Override + public DoNode createDoNode() { + return DoNode.createFnNode(getName(), fn, type); + } + + public boolean hasCombineFn() { + return fn instanceof CombineFn; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java new file mode 100644 index 0000000..5c9b93e --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; + +import org.apache.commons.lang.builder.HashCodeBuilder; + +import org.apache.crunch.Source; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PType; +import com.google.common.collect.ImmutableList; + +public class InputCollection extends PCollectionImpl { + + private final Source source; + + public InputCollection(Source source, MRPipeline pipeline) { + super(source.toString()); + this.source = source; + this.pipeline = pipeline; + } + + @Override + public PType getPType() { + return source.getType(); + } + + public Source getSource() { + return source; + } + + @Override + protected long getSizeInternal() { + long sz = source.getSize(pipeline.getConfiguration()); + if (sz < 0) { + throw new IllegalStateException("Input source " + source + " does not exist!"); + } + return sz; + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitInputCollection(this); + } + + @Override + public List> getParents() { + return ImmutableList.of(); + } + + @Override + public DoNode createDoNode() { + return DoNode.createInputNode(source); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof InputCollection)) { + return false; + } + return source.equals(((InputCollection) obj).source); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(source).toHashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java new file mode 100644 index 0000000..617d768 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; + +import org.apache.crunch.Pair; +import org.apache.crunch.TableSource; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import com.google.common.collect.ImmutableList; + +public class InputTable extends PTableBase { + + private final TableSource source; + private final InputCollection> asCollection; + + public InputTable(TableSource source, MRPipeline pipeline) { + super(source.toString()); + this.source = source; + this.pipeline = pipeline; + this.asCollection = new InputCollection>(source, pipeline); + } + + @Override + protected long getSizeInternal() { + return asCollection.getSizeInternal(); + } + + @Override + public PTableType getPTableType() { + return source.getTableType(); + } + + @Override + public PType> getPType() { + return source.getType(); + } + + @Override + public List> getParents() { + return ImmutableList.of(); + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitInputCollection(asCollection); + } + + @Override + public DoNode createDoNode() { + return DoNode.createInputNode(source); + } + + @Override + public int hashCode() { + return asCollection.hashCode(); + } + + @Override + public boolean equals(Object other) { + return asCollection.equals(other); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java new file mode 100644 index 0000000..a9e8401 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.crunch.DoFn; +import org.apache.crunch.FilterFn; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; +import org.apache.crunch.fn.ExtractKeyFn; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.lib.Sample; +import org.apache.crunch.lib.Sort; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; +import com.google.common.collect.Lists; + +public abstract class PCollectionImpl implements PCollection { + + private static final Log LOG = LogFactory.getLog(PCollectionImpl.class); + + private final String name; + protected MRPipeline pipeline; + private SourceTarget materializedAt; + + public PCollectionImpl(String name) { + this.name = name; + } + + @Override + public String getName() { + return name; + } + + @Override + public String toString() { + return getName(); + } + + @Override + public PCollection union(PCollection... collections) { + List> internal = Lists.newArrayList(); + internal.add(this); + for (PCollection collection : collections) { + internal.add((PCollectionImpl) collection); + } + return new UnionCollection(internal); + } + + @Override + public PCollection parallelDo(DoFn fn, PType type) { + MRPipeline pipeline = (MRPipeline) getPipeline(); + return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type); + } + + @Override + public PCollection parallelDo(String name, DoFn fn, PType type) { + return new DoCollectionImpl(name, this, fn, type); + } + + @Override + public PTable parallelDo(DoFn> fn, PTableType type) { + MRPipeline pipeline = (MRPipeline) getPipeline(); + return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type); + } + + @Override + public PTable parallelDo(String name, DoFn> fn, PTableType type) { + return new DoTableImpl(name, this, fn, type); + } + + @Override + public PCollection write(Target target) { + getPipeline().write(this, target); + return this; + } + + @Override + public Iterable materialize() { + if (getSize() == 0) { + LOG.warn("Materializing an empty PCollection: " + this.getName()); + return Collections.emptyList(); + } + return getPipeline().materialize(this); + } + + public SourceTarget getMaterializedAt() { + return materializedAt; + } + + public void materializeAt(SourceTarget sourceTarget) { + this.materializedAt = sourceTarget; + } + + @Override + public PCollection filter(FilterFn filterFn) { + return parallelDo(filterFn, getPType()); + } + + @Override + public PCollection filter(String name, FilterFn filterFn) { + return parallelDo(name, filterFn, getPType()); + } + + @Override + public PTable by(MapFn mapFn, PType keyType) { + return parallelDo(new ExtractKeyFn(mapFn), getTypeFamily().tableOf(keyType, getPType())); + } + + @Override + public PTable by(String name, MapFn mapFn, PType keyType) { + return parallelDo(name, new ExtractKeyFn(mapFn), getTypeFamily().tableOf(keyType, getPType())); + } + + @Override + public PCollection sort(boolean ascending) { + return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING); + } + + @Override + public PTable count() { + return Aggregate.count(this); + } + + @Override + public PCollection max() { + return Aggregate.max(this); + } + + @Override + public PCollection min() { + return Aggregate.min(this); + } + + @Override + public PCollection sample(double acceptanceProbability) { + return Sample.sample(this, acceptanceProbability); + } + + @Override + public PCollection sample(double acceptanceProbability, long seed) { + return Sample.sample(this, seed, acceptanceProbability); + } + + @Override + public PTypeFamily getTypeFamily() { + return getPType().getFamily(); + } + + public abstract DoNode createDoNode(); + + public abstract List> getParents(); + + public PCollectionImpl getOnlyParent() { + List> parents = getParents(); + if (parents.size() != 1) { + throw new IllegalArgumentException("Expected exactly one parent PCollection"); + } + return parents.get(0); + } + + @Override + public Pipeline getPipeline() { + if (pipeline == null) { + pipeline = (MRPipeline) getParents().get(0).getPipeline(); + } + return pipeline; + } + + public int getDepth() { + int parentMax = 0; + for (PCollectionImpl parent : getParents()) { + parentMax = Math.max(parent.getDepth(), parentMax); + } + return 1 + parentMax; + } + + public interface Visitor { + void visitInputCollection(InputCollection collection); + + void visitUnionCollection(UnionCollection collection); + + void visitDoFnCollection(DoCollectionImpl collection); + + void visitDoTable(DoTableImpl collection); + + void visitGroupedTable(PGroupedTableImpl collection); + } + + public void accept(Visitor visitor) { + if (materializedAt != null) { + visitor.visitInputCollection(new InputCollection(materializedAt, + (MRPipeline) getPipeline())); + } else { + acceptInternal(visitor); + } + } + + protected abstract void acceptInternal(Visitor visitor); + + @Override + public long getSize() { + if (materializedAt != null) { + long sz = materializedAt.getSize(getPipeline().getConfiguration()); + if (sz > 0) { + return sz; + } + } + return getSizeInternal(); + } + + protected abstract long getSizeInternal(); +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java new file mode 100644 index 0000000..13e5567 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.Job; + +import org.apache.crunch.CombineFn; +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PGroupedTableType; +import org.apache.crunch.types.PType; +import com.google.common.collect.ImmutableList; + +public class PGroupedTableImpl extends + PCollectionImpl>> implements PGroupedTable { + + private static final Log LOG = LogFactory.getLog(PGroupedTableImpl.class); + + private final PTableBase parent; + private final GroupingOptions groupingOptions; + private final PGroupedTableType ptype; + + PGroupedTableImpl(PTableBase parent) { + this(parent, null); + } + + PGroupedTableImpl(PTableBase parent, GroupingOptions groupingOptions) { + super("GBK"); + this.parent = parent; + this.groupingOptions = groupingOptions; + this.ptype = parent.getPTableType().getGroupedTableType(); + } + + public void configureShuffle(Job job) { + ptype.configureShuffle(job, groupingOptions); + if (groupingOptions == null || groupingOptions.getNumReducers() <= 0) { + long bytesPerTask = job.getConfiguration().getLong("crunch.bytes.per.reduce.task", + (1000L * 1000L * 1000L)); + int numReduceTasks = 1 + (int) (getSize() / bytesPerTask); + if (numReduceTasks > 0) { + job.setNumReduceTasks(numReduceTasks); + LOG.info(String.format("Setting num reduce tasks to %d", numReduceTasks)); + } else { + LOG.warn("Attempted to set a negative number of reduce tasks"); + } + } + } + + @Override + protected long getSizeInternal() { + return parent.getSizeInternal(); + } + + @Override + public PType>> getPType() { + return ptype; + } + + public PTable combineValues(CombineFn combineFn) { + return new DoTableImpl("combine", this, combineFn, + parent.getPTableType()); + } + + private static class Ungroup extends DoFn>, Pair> { + @Override + public void process(Pair> input, Emitter> emitter) { + for (V v : input.second()) { + emitter.emit(Pair.of(input.first(), v)); + } + } + } + + public PTable ungroup() { + return parallelDo("ungroup", new Ungroup(), parent.getPTableType()); + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitGroupedTable(this); + } + + @Override + public List> getParents() { + return ImmutableList.> of(parent); + } + + @Override + public DoNode createDoNode() { + return DoNode.createFnNode(getName(), + ptype.getInputMapFn(), ptype); + } + + public DoNode getGroupingNode() { + return DoNode.createGroupingNode("", ptype); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java new file mode 100644 index 0000000..f480001 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Target; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.lib.Cogroup; +import org.apache.crunch.lib.Join; +import org.apache.crunch.lib.PTables; +import org.apache.crunch.materialize.MaterializableMap; +import org.apache.crunch.types.PType; +import com.google.common.collect.Lists; + +public abstract class PTableBase extends PCollectionImpl> + implements PTable { + + public PTableBase(String name) { + super(name); + } + + public PType getKeyType() { + return getPTableType().getKeyType(); + } + + public PType getValueType() { + return getPTableType().getValueType(); + } + + public PGroupedTableImpl groupByKey() { + return new PGroupedTableImpl(this); + } + + public PGroupedTableImpl groupByKey(int numReduceTasks) { + return new PGroupedTableImpl(this, + GroupingOptions.builder().numReducers(numReduceTasks).build()); + } + + public PGroupedTableImpl groupByKey(GroupingOptions groupingOptions) { + return new PGroupedTableImpl(this, groupingOptions); + } + + @Override + public PTable union(PTable... others) { + List> internal = Lists.newArrayList(); + internal.add(this); + for (PTable table : others) { + internal.add((PTableBase) table); + } + return new UnionTable(internal); + } + + @Override + public PTable write(Target target) { + getPipeline().write(this, target); + return this; + } + + @Override + public PTable top(int count) { + return Aggregate.top(this, count, true); + } + + @Override + public PTable bottom(int count) { + return Aggregate.top(this, count, false); + } + + @Override + public PTable> collectValues() { + return Aggregate.collectValues(this); + } + + @Override + public PTable> join(PTable other) { + return Join.join(this, other); + } + + @Override + public PTable, Collection>> cogroup(PTable other) { + return Cogroup.cogroup(this, other); + } + + @Override + public PCollection keys() { + return PTables.keys(this); + } + + @Override + public PCollection values() { + return PTables.values(this); + } + + /** + * Returns a Map made up of the keys and values in this PTable. + */ + public Map materializeToMap() { + return new MaterializableMap(this.materialize()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java b/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java new file mode 100644 index 0000000..19a1161 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PType; +import com.google.common.collect.ImmutableList; + +public class UnionCollection extends PCollectionImpl { + + private List> parents; + private long size = 0; + + private static String flatName(List> collections) { + StringBuilder sb = new StringBuilder("union("); + for (int i = 0; i < collections.size(); i++) { + if (i != 0) { + sb.append(','); + } + sb.append(collections.get(i).getName()); + } + return sb.append(')').toString(); + } + + UnionCollection(List> collections) { + super(flatName(collections)); + this.parents = ImmutableList.copyOf(collections); + this.pipeline = (MRPipeline) parents.get(0).getPipeline(); + for (PCollectionImpl parent : parents) { + if (this.pipeline != parent.getPipeline()) { + throw new IllegalStateException( + "Cannot union PCollections from different Pipeline instances"); + } + size += parent.getSize(); + } + } + + @Override + protected long getSizeInternal() { + return size; + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitUnionCollection(this); + } + + @Override + public PType getPType() { + return parents.get(0).getPType(); + } + + @Override + public List> getParents() { + return ImmutableList.> copyOf(parents); + } + + @Override + public DoNode createDoNode() { + throw new UnsupportedOperationException( + "Unioned collection does not support DoNodes"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java new file mode 100644 index 0000000..f713912 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; + +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class UnionTable extends PTableBase { + + private PTableType ptype; + private List>> parents; + private long size; + + private static String flatName(List> tables) { + StringBuilder sb = new StringBuilder("union("); + for (int i = 0; i < tables.size(); i++) { + if (i != 0) { + sb.append(','); + } + sb.append(tables.get(i).getName()); + } + return sb.append(')').toString(); + } + + public UnionTable(List> tables) { + super(flatName(tables)); + this.ptype = tables.get(0).getPTableType(); + this.pipeline = (MRPipeline) tables.get(0).getPipeline(); + this.parents = Lists.newArrayList(); + for (PTableBase parent : tables) { + if (pipeline != parent.getPipeline()) { + throw new IllegalStateException( + "Cannot union PTables from different Pipeline instances"); + } + this.parents.add(parent); + size += parent.getSize(); + } + } + + @Override + protected long getSizeInternal() { + return size; + } + + @Override + public PTableType getPTableType() { + return ptype; + } + + @Override + public PType> getPType() { + return ptype; + } + + @Override + public List> getParents() { + return ImmutableList.> copyOf(parents); + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitUnionCollection(new UnionCollection>( + parents)); + } + + @Override + public DoNode createDoNode() { + throw new UnsupportedOperationException( + "Unioned table does not support do nodes"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java new file mode 100644 index 0000000..242aa4d --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.emit; + +import java.util.List; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.impl.mr.run.RTNode; +import com.google.common.collect.ImmutableList; + +/** + * An {@link Emitter} implementation that links the output of one {@link DoFn} + * to the input of another {@code DoFn}. + * + */ +public class IntermediateEmitter implements Emitter { + + private final List children; + + public IntermediateEmitter(List children) { + this.children = ImmutableList.copyOf(children); + } + + public void emit(Object emitted) { + for (RTNode child : children) { + child.process(emitted); + } + } + + public void flush() { + // No-op + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java b/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java new file mode 100644 index 0000000..5f52f41 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.emit; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs; + +import org.apache.crunch.Emitter; +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.crunch.types.Converter; + +public class MultipleOutputEmitter implements Emitter { + + private final Converter converter; + private final CrunchMultipleOutputs outputs; + private final String outputName; + + public MultipleOutputEmitter(Converter converter, + CrunchMultipleOutputs outputs, String outputName) { + this.converter = converter; + this.outputs = outputs; + this.outputName = outputName; + } + + @Override + public void emit(T emitted) { + try { + this.outputs.write(outputName, converter.outputKey(emitted), + converter.outputValue(emitted)); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } catch (InterruptedException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public void flush() { + // No-op + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java b/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java new file mode 100644 index 0000000..deb090c --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.emit; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +import org.apache.crunch.Emitter; +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.crunch.types.Converter; + +public class OutputEmitter implements Emitter { + + private final Converter converter; + private final TaskInputOutputContext context; + + public OutputEmitter(Converter converter, + TaskInputOutputContext context) { + this.converter = converter; + this.context = context; + } + + public void emit(T emitted) { + try { + K key = converter.outputKey(emitted); + V value = converter.outputValue(emitted); + this.context.write(key, value); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } catch (InterruptedException e) { + throw new CrunchRuntimeException(e); + } + } + + public void flush() { + // No-op + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java new file mode 100644 index 0000000..c790c1d --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.exec; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; +import org.apache.hadoop.util.StringUtils; + +import org.apache.crunch.impl.mr.plan.MSCROutputHandler; +import org.apache.crunch.impl.mr.plan.PlanningParameters; +import com.google.common.collect.Lists; + +public class CrunchJob extends CrunchControlledJob { + + private final Log log = LogFactory.getLog(CrunchJob.class); + + private final Path workingPath; + private final List multiPaths; + private final boolean mapOnlyJob; + + public CrunchJob(Job job, Path workingPath, MSCROutputHandler handler) throws IOException { + super(job, Lists.newArrayList()); + this.workingPath = workingPath; + this.multiPaths = handler.getMultiPaths(); + this.mapOnlyJob = handler.isMapOnlyJob(); + } + + private synchronized void handleMultiPaths() throws IOException { + if (!multiPaths.isEmpty()) { + // Need to handle moving the data from the output directory of the + // job to the output locations specified in the paths. + FileSystem fs = FileSystem.get(job.getConfiguration()); + for (int i = 0; i < multiPaths.size(); i++) { + Path src = new Path(workingPath, + PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*"); + Path[] srcs = FileUtil.stat2Paths(fs.globStatus(src), src); + Path dst = multiPaths.get(i); + if (!fs.exists(dst)) { + fs.mkdirs(dst); + } + int minPartIndex = getMinPartIndex(dst, fs); + for (Path s : srcs) { + fs.rename(s, getDestFile(s, dst, minPartIndex++)); + } + } + } + } + + private Path getDestFile(Path src, Path dir, int index) { + String form = "part-%s-%05d"; + if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) { + form = form + org.apache.avro.mapred.AvroOutputFormat.EXT; + } + return new Path(dir, String.format(form, mapOnlyJob ? "m" : "r", index)); + } + + private int getMinPartIndex(Path path, FileSystem fs) throws IOException { + // Quick and dirty way to ensure unique naming in the directory + return fs.listStatus(path).length; + } + + @Override + protected void checkRunningState() throws IOException, InterruptedException { + try { + if (job.isComplete()) { + if (job.isSuccessful()) { + handleMultiPaths(); + this.state = State.SUCCESS; + } else { + this.state = State.FAILED; + this.message = "Job failed!"; + } + } + } catch (IOException ioe) { + this.state = State.FAILED; + this.message = StringUtils.stringifyException(ioe); + try { + if (job != null) { + job.killJob(); + } + } catch (IOException e) { + } + } + } + + @Override + protected synchronized void submit() { + super.submit(); + if (this.state == State.RUNNING) { + log.info("Running job \"" + getJobName() + "\""); + log.info("Job status available at: " + job.getTrackingURL()); + } else { + log.info("Error occurred starting job \"" + getJobName() + "\":"); + log.info(getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java new file mode 100644 index 0000000..b678187 --- /dev/null +++ b/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.exec; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; +import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl; + +import org.apache.crunch.PipelineResult; +import com.google.common.collect.Lists; + +/** + * + * + */ +public class MRExecutor { + + private static final Log LOG = LogFactory.getLog(MRExecutor.class); + + private final CrunchJobControl control; + + public MRExecutor(Class jarClass) { + this.control = new CrunchJobControl(jarClass.toString()); + } + + public void addJob(CrunchJob job) { + this.control.addJob(job); + } + + public PipelineResult execute() { + try { + Thread controlThread = new Thread(control); + controlThread.start(); + while (!control.allFinished()) { + Thread.sleep(1000); + } + control.stop(); + } catch (InterruptedException e) { + LOG.info(e); + } + List failures = control.getFailedJobList(); + if (!failures.isEmpty()) { + System.err.println(failures.size() + " job failure(s) occurred:"); + for (CrunchControlledJob job : failures) { + System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage()); + } + } + List stages = Lists.newArrayList(); + for (CrunchControlledJob job : control.getSuccessfulJobList()) { + try { + stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters())); + } catch (Exception e) { + LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e); + } + } + return new PipelineResult(stages); + } +}