Return-Path: X-Original-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B03949B74 for ; Wed, 11 Jul 2012 05:15:02 +0000 (UTC) Received: (qmail 12634 invoked by uid 500); 11 Jul 2012 05:14:57 -0000 Delivered-To: apmail-incubator-crunch-commits-archive@incubator.apache.org Received: (qmail 11783 invoked by uid 500); 11 Jul 2012 05:14:52 -0000 Mailing-List: contact crunch-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-commits@incubator.apache.org Received: (qmail 11151 invoked by uid 99); 11 Jul 2012 05:14:47 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jul 2012 05:14:47 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2BD0CDAE0; Wed, 11 Jul 2012 05:14:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: crunch-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [10/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich Message-Id: <20120711051446.2BD0CDAE0@tyr.zones.apache.org> Date: Wed, 11 Jul 2012 05:14:46 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java new file mode 100644 index 0000000..08b6c64 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -0,0 +1,634 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.writable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.Tuple; +import org.apache.crunch.Tuple3; +import org.apache.crunch.Tuple4; +import org.apache.crunch.TupleN; +import org.apache.crunch.fn.CompositeMapFn; +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.TupleFactory; +import org.apache.crunch.util.PTypes; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Defines static methods that are analogous to the methods defined in + * {@link WritableTypeFamily} for convenient static importing. + * + */ +public class Writables { + private static final MapFn NULL_WRITABLE_TO_VOID = new MapFn() { + @Override + public Void map(NullWritable input) { + return null; + } + }; + + private static final MapFn VOID_TO_NULL_WRITABLE = new MapFn() { + @Override + public NullWritable map(Void input) { + return NullWritable.get(); + } + }; + + private static final MapFn TEXT_TO_STRING = new MapFn() { + @Override + public String map(Text input) { + return input.toString(); + } + }; + + private static final MapFn STRING_TO_TEXT = new MapFn() { + @Override + public Text map(String input) { + return new Text(input); + } + }; + + private static final MapFn IW_TO_INT = new MapFn() { + @Override + public Integer map(IntWritable input) { + return input.get(); + } + }; + + private static final MapFn INT_TO_IW = new MapFn() { + @Override + public IntWritable map(Integer input) { + return new IntWritable(input); + } + }; + + private static final MapFn LW_TO_LONG = new MapFn() { + @Override + public Long map(LongWritable input) { + return input.get(); + } + }; + + private static final MapFn LONG_TO_LW = new MapFn() { + @Override + public LongWritable map(Long input) { + return new LongWritable(input); + } + }; + + private static final MapFn FW_TO_FLOAT = new MapFn() { + @Override + public Float map(FloatWritable input) { + return input.get(); + } + }; + + private static final MapFn FLOAT_TO_FW = new MapFn() { + @Override + public FloatWritable map(Float input) { + return new FloatWritable(input); + } + }; + + private static final MapFn DW_TO_DOUBLE = new MapFn() { + @Override + public Double map(DoubleWritable input) { + return input.get(); + } + }; + + private static final MapFn DOUBLE_TO_DW = new MapFn() { + @Override + public DoubleWritable map(Double input) { + return new DoubleWritable(input); + } + }; + + private static final MapFn BW_TO_BOOLEAN = new MapFn() { + @Override + public Boolean map(BooleanWritable input) { + return input.get(); + } + }; + + private static final BooleanWritable TRUE = new BooleanWritable(true); + private static final BooleanWritable FALSE = new BooleanWritable(false); + private static final MapFn BOOLEAN_TO_BW = new MapFn() { + @Override + public BooleanWritable map(Boolean input) { + return input == Boolean.TRUE ? TRUE : FALSE; + } + }; + + private static final MapFn BW_TO_BB = new MapFn() { + @Override + public ByteBuffer map(BytesWritable input) { + return ByteBuffer.wrap(input.getBytes(), 0, input.getLength()); + } + }; + + private static final MapFn BB_TO_BW = new MapFn() { + @Override + public BytesWritable map(ByteBuffer input) { + BytesWritable bw = new BytesWritable(); + bw.set(input.array(), input.arrayOffset(), input.limit()); + return bw; + } + }; + + private static WritableType create(Class typeClass, + Class writableClass, MapFn inputDoFn, MapFn outputDoFn) { + return new WritableType(typeClass, writableClass, inputDoFn, + outputDoFn); + } + + private static final WritableType nulls = create(Void.class, NullWritable.class, + NULL_WRITABLE_TO_VOID, VOID_TO_NULL_WRITABLE); + private static final WritableType strings = create(String.class, Text.class, + TEXT_TO_STRING, STRING_TO_TEXT); + private static final WritableType longs = create(Long.class, LongWritable.class, + LW_TO_LONG, LONG_TO_LW); + private static final WritableType ints = create(Integer.class, IntWritable.class, + IW_TO_INT, INT_TO_IW); + private static final WritableType floats = create(Float.class, FloatWritable.class, + FW_TO_FLOAT, FLOAT_TO_FW); + private static final WritableType doubles = create(Double.class, + DoubleWritable.class, DW_TO_DOUBLE, DOUBLE_TO_DW); + private static final WritableType booleans = create(Boolean.class, + BooleanWritable.class, BW_TO_BOOLEAN, BOOLEAN_TO_BW); + private static final WritableType bytes = create(ByteBuffer.class, + BytesWritable.class, BW_TO_BB, BB_TO_BW); + + private static final Map, PType> PRIMITIVES = ImmutableMap., PType>builder() + .put(String.class, strings) + .put(Long.class, longs) + .put(Integer.class, ints) + .put(Float.class, floats) + .put(Double.class, doubles) + .put(Boolean.class, booleans) + .put(ByteBuffer.class, bytes) + .build(); + + private static final Map, WritableType> EXTENSIONS = Maps.newHashMap(); + + public static PType getPrimitiveType(Class clazz) { + return (PType) PRIMITIVES.get(clazz); + } + + public static void register(Class clazz, WritableType ptype) { + EXTENSIONS.put(clazz, ptype); + } + + public static final WritableType nulls() { + return nulls; + } + + public static final WritableType strings() { + return strings; + } + + public static final WritableType longs() { + return longs; + } + + public static final WritableType ints() { + return ints; + } + + public static final WritableType floats() { + return floats; + } + + public static final WritableType doubles() { + return doubles; + } + + public static final WritableType booleans() { + return booleans; + } + + public static final WritableType bytes() { + return bytes; + } + + public static final WritableType records(Class clazz) { + if (EXTENSIONS.containsKey(clazz)) { + return (WritableType) EXTENSIONS.get(clazz); + } + return (WritableType) writables(clazz.asSubclass(Writable.class)); + } + + public static WritableType writables(Class clazz) { + MapFn wIdentity = IdentityFn.getInstance(); + return new WritableType(clazz, clazz, wIdentity, wIdentity); + } + + public static WritableTableType tableOf( + PType key, PType value) { + if (key instanceof WritableTableType) { + WritableTableType wtt = (WritableTableType) key; + key = pairs(wtt.getKeyType(), wtt.getValueType()); + } else if (!(key instanceof WritableType)) { + throw new IllegalArgumentException("Key type must be of class WritableType"); + } + if (value instanceof WritableTableType) { + WritableTableType wtt = (WritableTableType) value; + value = pairs(wtt.getKeyType(), wtt.getValueType()); + } else if (!(value instanceof WritableType)) { + throw new IllegalArgumentException("Value type must be of class WritableType"); + } + return new WritableTableType((WritableType) key, (WritableType) value); + } + + /** + * For mapping from {@link TupleWritable} instances to {@link Tuple}s. + * + */ + private static class TWTupleMapFn extends MapFn { + private final TupleFactory tupleFactory; + private final List fns; + + private transient Object[] values; + + public TWTupleMapFn(TupleFactory tupleFactory, PType... ptypes) { + this.tupleFactory = tupleFactory; + this.fns = Lists.newArrayList(); + for (PType ptype : ptypes) { + fns.add(ptype.getInputMapFn()); + } + } + + @Override + public void configure(Configuration conf) { + for (MapFn fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setConfigurationForTest(Configuration conf) { + for (MapFn fn : fns) { + fn.setConfigurationForTest(conf); + } + } + + @Override + public void initialize() { + for (MapFn fn : fns) { + fn.setContext(getContext()); + } + // The rest of the methods allocate new + // objects each time. However this one + // uses Tuple.tuplify which does a copy + this.values = new Object[fns.size()]; + tupleFactory.initialize(); + } + + @Override + public Tuple map(TupleWritable in) { + for (int i = 0; i < values.length; i++) { + if (in.has(i)) { + values[i] = fns.get(i).map(in.get(i)); + } else { + values[i] = null; + } + } + return tupleFactory.makeTuple(values); + } + } + + /** + * For mapping from {@code Tuple}s to {@code TupleWritable}s. + * + */ + private static class TupleTWMapFn extends MapFn { + + private transient TupleWritable writable; + private transient Writable[] values; + + private final List fns; + + public TupleTWMapFn(PType... ptypes) { + this.fns = Lists.newArrayList(); + for (PType ptype : ptypes) { + fns.add(ptype.getOutputMapFn()); + } + } + + @Override + public void configure(Configuration conf) { + for (MapFn fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setConfigurationForTest(Configuration conf) { + for (MapFn fn : fns) { + fn.setConfigurationForTest(conf); + } + } + + @Override + public void initialize() { + this.values = new Writable[fns.size()]; + this.writable = new TupleWritable(values); + for (MapFn fn : fns) { + fn.setContext(getContext()); + } + } + + @Override + public TupleWritable map(Tuple input) { + writable.clearWritten(); + for (int i = 0; i < input.size(); i++) { + Object value = input.get(i); + if (value != null) { + writable.setWritten(i); + values[i] = (Writable) fns.get(i).map(value); + } + } + return writable; + } + } + + public static WritableType, TupleWritable> pairs(PType p1, PType p2) { + TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, p1, p2); + TupleTWMapFn output = new TupleTWMapFn(p1, p2); + return new WritableType(Pair.class, TupleWritable.class, input, output, p1, p2); + } + + public static WritableType, TupleWritable> triples(PType p1, + PType p2, PType p3) { + TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, p1, p2, p3); + TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3); + return new WritableType(Tuple3.class, TupleWritable.class, + input, + output, + p1, p2, p3); + } + + public static WritableType, TupleWritable> quads(PType p1, + PType p2, PType p3, PType p4) { + TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, p1, p2, p3, p4); + TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3, p4); + return new WritableType(Tuple4.class, TupleWritable.class, + input, + output, + p1, p2, p3, p4); + } + + public static WritableType tuples(PType... ptypes) { + TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, ptypes); + TupleTWMapFn output = new TupleTWMapFn(ptypes); + return new WritableType(TupleN.class, TupleWritable.class, input, output, ptypes); + } + + public static PType tuples(Class clazz, PType... ptypes) { + Class[] typeArgs = new Class[ptypes.length]; + for (int i = 0; i < typeArgs.length; i++) { + typeArgs[i] = ptypes[i].getTypeClass(); + } + TupleFactory factory = TupleFactory.create(clazz, typeArgs); + TWTupleMapFn input = new TWTupleMapFn(factory, ptypes); + TupleTWMapFn output = new TupleTWMapFn(ptypes); + return new WritableType(clazz, TupleWritable.class, input, output, ptypes); + } + + public static PType derived(Class clazz, MapFn inputFn, MapFn outputFn, + PType base) { + WritableType wt = (WritableType) base; + MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn); + MapFn output = new CompositeMapFn(outputFn, wt.getOutputMapFn()); + return new WritableType(clazz, wt.getSerializationClass(), input, output, base.getSubTypes().toArray(new PType[0])); + } + + private static class ArrayCollectionMapFn extends + MapFn> { + private final MapFn mapFn; + + public ArrayCollectionMapFn(MapFn mapFn) { + this.mapFn = mapFn; + } + + @Override + public void configure(Configuration conf) { + mapFn.configure(conf); + } + + @Override + public void setConfigurationForTest(Configuration conf) { + mapFn.setConfigurationForTest(conf); + } + + @Override + public void initialize() { + mapFn.setContext(getContext()); + } + + @Override + public Collection map(GenericArrayWritable input) { + Collection collection = Lists.newArrayList(); + for (Writable writable : input.get()) { + collection.add(mapFn.map(writable)); + } + return collection; + } + } + + private static class CollectionArrayMapFn extends + MapFn, GenericArrayWritable> { + + private final Class clazz; + private final MapFn mapFn; + + public CollectionArrayMapFn(Class clazz, + MapFn mapFn) { + this.clazz = clazz; + this.mapFn = mapFn; + } + + @Override + public void configure(Configuration conf) { + mapFn.configure(conf); + } + + @Override + public void setConfigurationForTest(Configuration conf) { + mapFn.setConfigurationForTest(conf); + } + + @Override + public void initialize() { + mapFn.setContext(getContext()); + } + + @Override + public GenericArrayWritable map(Collection input) { + GenericArrayWritable arrayWritable = new GenericArrayWritable(clazz); + Writable[] w = new Writable[input.size()]; + int index = 0; + for (T in : input) { + w[index++] = ((Writable) mapFn.map(in)); + } + arrayWritable.set(w); + return arrayWritable; + } + } + + public static WritableType, GenericArrayWritable> collections(PType ptype) { + WritableType wt = (WritableType) ptype; + return new WritableType(Collection.class, GenericArrayWritable.class, + new ArrayCollectionMapFn(wt.getInputMapFn()), new CollectionArrayMapFn( + wt.getSerializationClass(), wt.getOutputMapFn()), ptype); + } + + private static class MapInputMapFn extends MapFn, Map> { + private final MapFn mapFn; + + public MapInputMapFn(MapFn mapFn) { + this.mapFn = mapFn; + } + + @Override + public void configure(Configuration conf) { + mapFn.configure(conf); + } + + @Override + public void setConfigurationForTest(Configuration conf) { + mapFn.setConfigurationForTest(conf); + } + + @Override + public void initialize() { + mapFn.setContext(getContext()); + } + + @Override + public Map map(TextMapWritable input) { + Map out = Maps.newHashMap(); + for (Map.Entry e : input.entrySet()) { + out.put(e.getKey().toString(), mapFn.map(e.getValue())); + } + return out; + } + } + + private static class MapOutputMapFn extends MapFn, TextMapWritable> { + + private final Class clazz; + private final MapFn mapFn; + + public MapOutputMapFn(Class clazz, MapFn mapFn) { + this.clazz = clazz; + this.mapFn = mapFn; + } + + @Override + public void configure(Configuration conf) { + mapFn.configure(conf); + } + + @Override + public void setConfigurationForTest(Configuration conf) { + mapFn.setConfigurationForTest(conf); + } + + @Override + public void initialize() { + mapFn.setContext(getContext()); + } + + @Override + public TextMapWritable map(Map input) { + TextMapWritable tmw = new TextMapWritable(clazz); + for (Map.Entry e : input.entrySet()) { + tmw.put(new Text(e.getKey()), mapFn.map(e.getValue())); + } + return tmw; + } + } + + public static WritableType, MapWritable> maps(PType ptype) { + WritableType wt = (WritableType) ptype; + return new WritableType(Map.class, TextMapWritable.class, + new MapInputMapFn(wt.getInputMapFn()), + new MapOutputMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype); + } + + public static PType jsons(Class clazz) { + return PTypes.jsonString(clazz, WritableTypeFamily.getInstance()); + } + + /** + * Perform a deep copy of a writable value. + * + * @param value + * The value to be copied + * @param writableClass + * The Writable class of the value to be copied + * @return A fully detached deep copy of the input value + */ + public static T deepCopy(T value, Class writableClass) { + ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(byteOutStream); + T copiedValue = null; + try { + value.write(dataOut); + dataOut.flush(); + ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); + DataInput dataInput = new DataInputStream(byteInStream); + copiedValue = writableClass.newInstance(); + copiedValue.readFields(dataInput); + } catch (Exception e) { + throw new CrunchRuntimeException("Error while deep copying " + value, e); + } + return copiedValue; + } + + // Not instantiable + private Writables() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/util/Collects.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/Collects.java b/crunch/src/main/java/org/apache/crunch/util/Collects.java new file mode 100644 index 0000000..f5b07c4 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/util/Collects.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.util; + +import java.util.Collection; +import java.util.Iterator; + +import com.google.common.collect.Lists; + +/** + * Utility functions for returning Collection objects backed by different types + * of implementations. + */ +public class Collects { + + public static Collection newArrayList() { + return Lists.newArrayList(); + } + + public static Collection newArrayList(T...elements) { + return Lists.newArrayList(elements); + } + + public static Collection newArrayList(Iterable elements) { + return Lists.newArrayList(elements); + } + + public static Collection newArrayList(Iterator elements) { + return Lists.newArrayList(elements); + } + + private Collects() {} +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/util/DistCache.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/DistCache.java b/crunch/src/main/java/org/apache/crunch/util/DistCache.java new file mode 100644 index 0000000..682e8f0 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/util/DistCache.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.util; + +import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.URI; +import java.net.URL; +import java.net.URLDecoder; +import java.util.Enumeration; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; + +/** + * Provides functions for working with Hadoop's distributed cache. These include: + *
    + *
  • + * Functions for working with a job-specific distributed cache of objects, like the + * serialized runtime nodes in a MapReduce. + *
  • + *
  • + * Functions for adding library jars to the distributed cache, which will be added to the + * classpath of MapReduce tasks. + *
  • + *
+ */ +public class DistCache { + + // Configuration key holding the paths of jars to export to the distributed cache. + private static final String TMPJARS_KEY = "tmpjars"; + + public static void write(Configuration conf, Path path, Object value) throws IOException { + ObjectOutputStream oos = new ObjectOutputStream(FileSystem.get(conf).create(path)); + oos.writeObject(value); + oos.close(); + + DistributedCache.addCacheFile(path.toUri(), conf); + } + + public static Object read(Configuration conf, Path path) throws IOException { + URI target = null; + for (URI uri : DistributedCache.getCacheFiles(conf)) { + if (uri.toString().equals(path.toString())) { + target = uri; + break; + } + } + Object value = null; + if (target != null) { + Path targetPath = new Path(target.toString()); + ObjectInputStream ois = new ObjectInputStream(targetPath.getFileSystem(conf).open(targetPath)); + try { + value = ois.readObject(); + } catch (ClassNotFoundException e) { + throw new CrunchRuntimeException(e); + } + ois.close(); + } + return value; + } + + /** + * Adds the specified jar to the distributed cache of jobs using the provided configuration. The + * jar will be placed on the classpath of tasks run by the job. + * + * @param conf The configuration used to add the jar to the distributed cache. + * @param jarFile The jar file to add to the distributed cache. + * @throws IOException If the jar file does not exist or there is a problem accessing the file. + */ + public static void addJarToDistributedCache(Configuration conf, File jarFile) throws IOException { + if (!jarFile.exists()) { + throw new IOException("Jar file: " + jarFile.getCanonicalPath() + " does not exist."); + } + if (!jarFile.getName().endsWith(".jar")) { + throw new IllegalArgumentException("File: " + jarFile.getCanonicalPath() + " is not a .jar " + + "file."); + } + // Get a qualified path for the jar. + FileSystem fileSystem = FileSystem.getLocal(conf); + Path jarPath = new Path(jarFile.getCanonicalPath()); + String qualifiedPath = jarPath.makeQualified(fileSystem).toString(); + // Add the jar to the configuration variable. + String jarConfiguration = conf.get(TMPJARS_KEY, ""); + if (!jarConfiguration.isEmpty()) { + jarConfiguration += ","; + } + jarConfiguration += qualifiedPath; + conf.set(TMPJARS_KEY, jarConfiguration); + } + + /** + * Adds the jar at the specified path to the distributed cache of jobs using the provided + * configuration. The jar will be placed on the classpath of tasks run by the job. + * + * @param conf The configuration used to add the jar to the distributed cache. + * @param jarFile The path to the jar file to add to the distributed cache. + * @throws IOException If the jar file does not exist or there is a problem accessing the file. + */ + public static void addJarToDistributedCache(Configuration conf, String jarFile) + throws IOException { + addJarToDistributedCache(conf, new File(jarFile)); + } + + /** + * Finds the path to a jar that contains the class provided, if any. There is no guarantee that + * the jar returned will be the first on the classpath to contain the file. This method is + * basically lifted out of Hadoop's {@link org.apache.hadoop.mapred.JobConf} class. + * + * @param jarClass The class the jar file should contain. + * @return The path to a jar file that contains the class, or null if no such jar + * exists. + * @throws IOException If there is a problem searching for the jar file. + */ + public static String findContainingJar(Class jarClass) throws IOException { + ClassLoader loader = jarClass.getClassLoader(); + String classFile = jarClass.getName().replaceAll("\\.", "/") + ".class"; + for(Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) { + URL url = (URL) itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + // URLDecoder is a misnamed class, since it actually decodes + // x-www-form-urlencoded MIME type rather than actual + // URL encoding (which the file path has). Therefore it would + // decode +s to ' 's which is incorrect (spaces are actually + // either unencoded or encoded as "%20"). Replace +s first, so + // that they are kept sacred during the decoding process. + toReturn = toReturn.replaceAll("\\+", "%2B"); + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + return null; + } + + /** + * Adds all jars under the specified directory to the distributed cache of jobs using the + * provided configuration. The jars will be placed on the classpath of tasks run by the job. + * This method does not descend into subdirectories when adding jars. + * + * @param conf The configuration used to add jars to the distributed cache. + * @param jarDirectory A directory containing jar files to add to the distributed cache. + * @throws IOException If the directory does not exist or there is a problem accessing the + * directory. + */ + public static void addJarDirToDistributedCache(Configuration conf, File jarDirectory) + throws IOException { + if (!jarDirectory.exists() || !jarDirectory.isDirectory()) { + throw new IOException("Jar directory: " + jarDirectory.getCanonicalPath() + " does not " + + "exist or is not a directory."); + } + for (File file : jarDirectory.listFiles()) { + if (!file.isDirectory() && file.getName().endsWith(".jar")) { + addJarToDistributedCache(conf, file); + } + } + } + + /** + * Adds all jars under the directory at the specified path to the distributed cache of jobs + * using the provided configuration. The jars will be placed on the classpath of the tasks + * run by the job. This method does not descend into subdirectories when adding jars. + * + * @param conf The configuration used to add jars to the distributed cache. + * @param jarDirectory The path to a directory containing jar files to add to the distributed + * cache. + * @throws IOException If the directory does not exist or there is a problem accessing the + * directory. + */ + public static void addJarDirToDistributedCache(Configuration conf, String jarDirectory) + throws IOException { + addJarDirToDistributedCache(conf, new File(jarDirectory)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/util/PTypes.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/PTypes.java b/crunch/src/main/java/org/apache/crunch/util/PTypes.java new file mode 100644 index 0000000..863b40f --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/util/PTypes.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.util; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.smile.SmileFactory; + +import org.apache.crunch.MapFn; +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +/** + * Utility functions for creating common types of derived PTypes, e.g., for JSON data, + * protocol buffers, and Thrift records. + * + */ +public class PTypes { + + public static PType bigInt(PTypeFamily typeFamily) { + return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes()); + } + + public static PType jsonString(Class clazz, PTypeFamily typeFamily) { + return typeFamily.derived(clazz, new JacksonInputMapFn(clazz), + new JacksonOutputMapFn(), typeFamily.strings()); + } + + public static PType smile(Class clazz, PTypeFamily typeFamily) { + return typeFamily.derived(clazz, new SmileInputMapFn(clazz), + new SmileOutputMapFn(), typeFamily.bytes()); + } + + public static PType protos(Class clazz, PTypeFamily typeFamily) { + return typeFamily.derived(clazz, new ProtoInputMapFn(clazz), + new ProtoOutputMapFn(), typeFamily.bytes()); + } + + public static PType thrifts(Class clazz, PTypeFamily typeFamily) { + return typeFamily.derived(clazz, new ThriftInputMapFn(clazz), + new ThriftOutputMapFn(), typeFamily.bytes()); + } + + public static MapFn BYTE_TO_BIGINT = new MapFn() { + public BigInteger map(ByteBuffer input) { + return input == null ? null : new BigInteger(input.array()); + } + }; + + public static MapFn BIGINT_TO_BYTE = new MapFn() { + public ByteBuffer map(BigInteger input) { + return input == null ? null : ByteBuffer.wrap(input.toByteArray()); + } + }; + + public static class SmileInputMapFn extends MapFn { + + private final Class clazz; + private transient ObjectMapper mapper; + + public SmileInputMapFn(Class clazz) { + this.clazz = clazz; + } + + @Override + public void initialize() { + this.mapper = new ObjectMapper(new SmileFactory()); + } + + @Override + public T map(ByteBuffer input) { + try { + return mapper.readValue(input.array(), input.position(), input.limit(), clazz); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + + public static class SmileOutputMapFn extends MapFn { + private transient ObjectMapper mapper; + + @Override + public void initialize() { + this.mapper = new ObjectMapper(new SmileFactory()); + } + + @Override + public ByteBuffer map(T input) { + try { + return ByteBuffer.wrap(mapper.writeValueAsBytes(input)); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + + public static class JacksonInputMapFn extends MapFn { + + private final Class clazz; + private transient ObjectMapper mapper; + + public JacksonInputMapFn(Class clazz) { + this.clazz = clazz; + } + + @Override + public void initialize() { + this.mapper = new ObjectMapper(); + } + + @Override + public T map(String input) { + try { + return mapper.readValue(input, clazz); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + + public static class JacksonOutputMapFn extends MapFn { + + private transient ObjectMapper mapper; + + @Override + public void initialize() { + this.mapper = new ObjectMapper(); + } + + @Override + public String map(T input) { + try { + return mapper.writeValueAsString(input); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + } + + public static class ProtoInputMapFn extends MapFn { + + private final Class clazz; + private transient T instance; + + public ProtoInputMapFn(Class clazz) { + this.clazz = clazz; + } + + @Override + public void initialize() { + this.instance = ReflectionUtils.newInstance(clazz, getConfiguration()); + } + + @Override + public T map(ByteBuffer bb) { + try { + return (T) instance.newBuilderForType().mergeFrom( + bb.array(), bb.position(), bb.limit()).build(); + } catch (InvalidProtocolBufferException e) { + throw new CrunchRuntimeException(e); + } + } + } + + public static class ProtoOutputMapFn extends MapFn { + + public ProtoOutputMapFn() { + } + + @Override + public ByteBuffer map(T proto) { + return ByteBuffer.wrap(proto.toByteArray()); + } + } + + public static class ThriftInputMapFn extends MapFn { + + private final Class clazz; + private transient T instance; + private transient TDeserializer deserializer; + private transient byte[] bytes; + + public ThriftInputMapFn(Class clazz) { + this.clazz = clazz; + } + + @Override + public void initialize() { + this.instance = ReflectionUtils.newInstance(clazz, getConfiguration()); + this.deserializer = new TDeserializer(new TBinaryProtocol.Factory()); + this.bytes = new byte[0]; + } + + @Override + public T map(ByteBuffer bb) { + T next = (T) instance.deepCopy(); + int len = bb.limit() - bb.position(); + if (len != bytes.length) { + bytes = new byte[len]; + } + System.arraycopy(bb.array(), bb.position(), bytes, 0, len); + try { + deserializer.deserialize(next, bytes); + } catch (TException e) { + throw new CrunchRuntimeException(e); + } + return next; + } + } + + public static class ThriftOutputMapFn extends MapFn { + + private transient TSerializer serializer; + + public ThriftOutputMapFn() { + } + + @Override + public void initialize() { + this.serializer = new TSerializer(new TBinaryProtocol.Factory()); + } + + @Override + public ByteBuffer map(T t) { + try { + return ByteBuffer.wrap(serializer.serialize(t)); + } catch (TException e) { + throw new CrunchRuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/util/Protos.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/Protos.java b/crunch/src/main/java/org/apache/crunch/util/Protos.java new file mode 100644 index 0000000..2cda492 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/util/Protos.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.util; + +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.util.ReflectionUtils; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import com.google.common.base.Splitter; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; + +/** + * Utility functions for working with protocol buffers in Crunch. + */ +public class Protos { + + public static MapFn extractKey(String fieldName) { + return new ExtractKeyFn(fieldName); + } + + public static DoFn lineParser(String sep, Class msgClass) { + return new TextToProtoFn(sep, msgClass); + } + + public static class ExtractKeyFn extends MapFn { + + private final String fieldName; + + private transient FieldDescriptor fd; + + public ExtractKeyFn(String fieldName) { + this.fieldName = fieldName; + } + + @Override + public K map(M input) { + if (input == null) { + throw new IllegalArgumentException("Null inputs not supported by Protos.ExtractKeyFn"); + } else if (fd == null) { + fd = input.getDescriptorForType().findFieldByName(fieldName); + if (fd == null) { + throw new IllegalStateException( + "Could not find field: " + fieldName + " in message: " + input); + } + } + return (K) input.getField(fd); + } + + } + + public static class TextToProtoFn extends DoFn { + + private final String sep; + private final Class msgClass; + + private transient M msgInstance; + private transient List fields; + private transient Splitter splitter; + + enum ParseErrors { TOTAL, NUMBER_FORMAT }; + + public TextToProtoFn(String sep, Class msgClass) { + this.sep = sep; + this.msgClass = msgClass; + } + + @Override + public void initialize() { + this.msgInstance = ReflectionUtils.newInstance(msgClass, getConfiguration()); + this.fields = msgInstance.getDescriptorForType().getFields(); + this.splitter = Splitter.on(sep); + } + + @Override + public void process(String input, Emitter emitter) { + if (input != null && !input.isEmpty()) { + Builder b = msgInstance.newBuilderForType(); + Iterator iter = splitter.split(input).iterator(); + boolean parseError = false; + for (FieldDescriptor fd : fields) { + if (iter.hasNext()) { + String value = iter.next(); + if (value != null && !value.isEmpty()) { + Object parsedValue = null; + try { + switch (fd.getJavaType()) { + case STRING: + parsedValue = value; + break; + case INT: + parsedValue = Integer.valueOf(value); + break; + case LONG: + parsedValue = Long.valueOf(value); + break; + case FLOAT: + parsedValue = Float.valueOf(value); + break; + case DOUBLE: + parsedValue = Double.valueOf(value); + break; + case BOOLEAN: + parsedValue = Boolean.valueOf(value); + break; + case ENUM: + parsedValue = fd.getEnumType().findValueByName(value); + break; + } + b.setField(fd, parsedValue); + } catch (NumberFormatException nfe) { + increment(ParseErrors.NUMBER_FORMAT); + parseError = true; + break; + } + } + } + } + + if (parseError) { + increment(ParseErrors.TOTAL); + } else { + emitter.emit((M) b.build()); + } + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/util/Tuples.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/util/Tuples.java b/crunch/src/main/java/org/apache/crunch/util/Tuples.java new file mode 100644 index 0000000..b8eb3b9 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/util/Tuples.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.util; + +import java.util.Iterator; +import java.util.List; + +import org.apache.crunch.Pair; +import org.apache.crunch.Tuple3; +import org.apache.crunch.Tuple4; +import org.apache.crunch.TupleN; +import com.google.common.collect.Lists; +import com.google.common.collect.UnmodifiableIterator; + +/** + * Utilities for working with subclasses of the {@code Tuple} interface. + * + */ +public class Tuples { + + private static abstract class TuplifyIterator extends UnmodifiableIterator { + protected List> iterators; + + public TuplifyIterator(Iterator...iterators) { + this.iterators = Lists.newArrayList(iterators); + } + + @Override + public boolean hasNext() { + for (Iterator iter : iterators) { + if (!iter.hasNext()) { + return false; + } + } + return true; + } + + protected Object next(int index) { + return iterators.get(index).next(); + } + } + + public static class PairIterable implements Iterable> { + private final Iterable first; + private final Iterable second; + + public PairIterable(Iterable first, Iterable second) { + this.first = first; + this.second = second; + } + + @Override + public Iterator> iterator() { + return new TuplifyIterator>(first.iterator(), second.iterator()) { + @Override + public Pair next() { + return Pair.of((S) next(0), (T) next(1)); + } + }; + } + } + + public static class TripIterable implements Iterable> { + private final Iterable first; + private final Iterable second; + private final Iterable third; + + public TripIterable(Iterable first, Iterable second, Iterable third) { + this.first = first; + this.second = second; + this.third = third; + } + + @Override + public Iterator> iterator() { + return new TuplifyIterator>(first.iterator(), second.iterator(), + third.iterator()) { + @Override + public Tuple3 next() { + return new Tuple3((A) next(0), (B) next(1), (C) next(2)); + } + }; + } + } + + public static class QuadIterable implements Iterable> { + private final Iterable first; + private final Iterable second; + private final Iterable third; + private final Iterable fourth; + + public QuadIterable(Iterable first, Iterable second, Iterable third, + Iterable fourth) { + this.first = first; + this.second = second; + this.third = third; + this.fourth = fourth; + } + + @Override + public Iterator> iterator() { + return new TuplifyIterator>(first.iterator(), second.iterator(), + third.iterator(), fourth.iterator()) { + @Override + public Tuple4 next() { + return new Tuple4((A) next(0), (B) next(1), (C) next(2), (D) next(3)); + } + }; + } + } + + public static class TupleNIterable implements Iterable { + private final Iterator[] iters; + + public TupleNIterable(Iterable... iterables) { + this.iters = new Iterator[iterables.length]; + for (int i = 0; i < iters.length; i++) { + iters[i] = iterables[i].iterator(); + } + } + + @Override + public Iterator iterator() { + return new TuplifyIterator(iters) { + @Override + public TupleN next() { + Object[] values = new Object[iters.length]; + for (int i = 0; i < values.length; i++) { + values[i] = next(i); + } + return new TupleN(values); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java new file mode 100644 index 0000000..6756dbb --- /dev/null +++ b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.jobcontrol; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.StringUtils; + +/** + * This class encapsulates a MapReduce job and its dependency. It monitors the + * states of the depending jobs and updates the state of this job. A job starts + * in the WAITING state. If it does not have any depending jobs, or all of the + * depending jobs are in SUCCESS state, then the job state will become READY. If + * any depending jobs fail, the job will fail too. When in READY state, the job + * can be submitted to Hadoop for execution, with the state changing into + * RUNNING state. From RUNNING state, the job can get into SUCCESS or FAILED + * state, depending the status of the job execution. + */ +public class CrunchControlledJob { + + // A job will be in one of the following states + public static enum State { + SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED + }; + + public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist"; + protected State state; + protected Job job; // mapreduce job to be executed. + // some info for human consumption, e.g. the reason why the job failed + protected String message; + private String controlID; // assigned and used by JobControl class + // the jobs the current job depends on + private List dependingJobs; + + /** + * Construct a job. + * + * @param job + * a mapreduce job to be executed. + * @param dependingJobs + * an array of jobs the current job depends on + */ + public CrunchControlledJob(Job job, List dependingJobs) + throws IOException { + this.job = job; + this.dependingJobs = dependingJobs; + this.state = State.WAITING; + this.controlID = "unassigned"; + this.message = "just initialized"; + } + + /** + * Construct a job. + * + * @param conf + * mapred job configuration representing a job to be executed. + * @throws IOException + */ + public CrunchControlledJob(Configuration conf) throws IOException { + this(new Job(conf), null); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("job name:\t").append(this.job.getJobName()).append("\n"); + sb.append("job id:\t").append(this.controlID).append("\n"); + sb.append("job state:\t").append(this.state).append("\n"); + sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n"); + sb.append("job message:\t").append(this.message).append("\n"); + + if (this.dependingJobs == null || this.dependingJobs.size() == 0) { + sb.append("job has no depending job:\t").append("\n"); + } else { + sb.append("job has ").append(this.dependingJobs.size()) + .append(" dependeng jobs:\n"); + for (int i = 0; i < this.dependingJobs.size(); i++) { + sb.append("\t depending job ").append(i).append(":\t"); + sb.append((this.dependingJobs.get(i)).getJobName()).append("\n"); + } + } + return sb.toString(); + } + + /** + * @return the job name of this job + */ + public String getJobName() { + return job.getJobName(); + } + + /** + * Set the job name for this job. + * + * @param jobName + * the job name + */ + public void setJobName(String jobName) { + job.setJobName(jobName); + } + + /** + * @return the job ID of this job assigned by JobControl + */ + public String getJobID() { + return this.controlID; + } + + /** + * Set the job ID for this job. + * + * @param id + * the job ID + */ + public void setJobID(String id) { + this.controlID = id; + } + + /** + * @return the mapred ID of this job as assigned by the mapred framework. + */ + public JobID getMapredJobID() { + return this.job.getJobID(); + } + + /** + * @return the mapreduce job + */ + public synchronized Job getJob() { + return this.job; + } + + /** + * Set the mapreduce job + * + * @param job + * the mapreduce job for this job. + */ + public synchronized void setJob(Job job) { + this.job = job; + } + + /** + * @return the state of this job + */ + public synchronized State getJobState() { + return this.state; + } + + /** + * Set the state for this job. + * + * @param state + * the new state for this job. + */ + protected synchronized void setJobState(State state) { + this.state = state; + } + + /** + * @return the message of this job + */ + public synchronized String getMessage() { + return this.message; + } + + /** + * Set the message for this job. + * + * @param message + * the message for this job. + */ + public synchronized void setMessage(String message) { + this.message = message; + } + + /** + * @return the depending jobs of this job + */ + public List getDependentJobs() { + return this.dependingJobs; + } + + /** + * Add a job to this jobs' dependency list. Dependent jobs can only be added + * while a Job is waiting to run, not during or afterwards. + * + * @param dependingJob + * Job that this Job depends on. + * @return true if the Job was added. + */ + public synchronized boolean addDependingJob(CrunchControlledJob dependingJob) { + if (this.state == State.WAITING) { // only allowed to add jobs when waiting + if (this.dependingJobs == null) { + this.dependingJobs = new ArrayList(); + } + return this.dependingJobs.add(dependingJob); + } else { + return false; + } + } + + /** + * @return true if this job is in a complete state + */ + public synchronized boolean isCompleted() { + return this.state == State.FAILED || this.state == State.DEPENDENT_FAILED + || this.state == State.SUCCESS; + } + + /** + * @return true if this job is in READY state + */ + public synchronized boolean isReady() { + return this.state == State.READY; + } + + public void killJob() throws IOException, InterruptedException { + job.killJob(); + } + + /** + * Check the state of this running job. The state may remain the same, become + * SUCCESS or FAILED. + */ + protected void checkRunningState() throws IOException, InterruptedException { + try { + if (job.isComplete()) { + if (job.isSuccessful()) { + this.state = State.SUCCESS; + } else { + this.state = State.FAILED; + this.message = "Job failed!"; + } + } + } catch (IOException ioe) { + this.state = State.FAILED; + this.message = StringUtils.stringifyException(ioe); + try { + if (job != null) { + job.killJob(); + } + } catch (IOException e) { + } + } + } + + /** + * Check and update the state of this job. The state changes depending on its + * current state and the states of the depending jobs. + */ + synchronized State checkState() throws IOException, InterruptedException { + if (this.state == State.RUNNING) { + checkRunningState(); + } + if (this.state != State.WAITING) { + return this.state; + } + if (this.dependingJobs == null || this.dependingJobs.size() == 0) { + this.state = State.READY; + return this.state; + } + CrunchControlledJob pred = null; + int n = this.dependingJobs.size(); + for (int i = 0; i < n; i++) { + pred = this.dependingJobs.get(i); + State s = pred.checkState(); + if (s == State.WAITING || s == State.READY || s == State.RUNNING) { + break; // a pred is still not completed, continue in WAITING + // state + } + if (s == State.FAILED || s == State.DEPENDENT_FAILED) { + this.state = State.DEPENDENT_FAILED; + this.message = "depending job " + i + " with jobID " + pred.getJobID() + + " failed. " + pred.getMessage(); + break; + } + // pred must be in success state + if (i == n - 1) { + this.state = State.READY; + } + } + + return this.state; + } + + /** + * Submit this job to mapred. The state becomes RUNNING if submission is + * successful, FAILED otherwise. + */ + protected synchronized void submit() { + try { + Configuration conf = job.getConfiguration(); + if (conf.getBoolean(CREATE_DIR, false)) { + FileSystem fs = FileSystem.get(conf); + Path inputPaths[] = FileInputFormat.getInputPaths(job); + for (int i = 0; i < inputPaths.length; i++) { + if (!fs.exists(inputPaths[i])) { + try { + fs.mkdirs(inputPaths[i]); + } catch (IOException e) { + + } + } + } + } + job.submit(); + this.state = State.RUNNING; + } catch (Exception ioe) { + this.state = State.FAILED; + this.message = StringUtils.stringifyException(ioe); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java new file mode 100644 index 0000000..7fa61d3 --- /dev/null +++ b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.jobcontrol; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State; + +/** + * This class encapsulates a set of MapReduce jobs and its dependency. + * + * It tracks the states of the jobs by placing them into different tables + * according to their states. + * + * This class provides APIs for the client app to add a job to the group and to + * get the jobs in the group in different states. When a job is added, an ID + * unique to the group is assigned to the job. + * + * This class has a thread that submits jobs when they become ready, monitors + * the states of the running jobs, and updates the states of jobs based on the + * state changes of their depending jobs states. The class provides APIs for + * suspending/resuming the thread, and for stopping the thread. + * + * TODO This is mostly a copy of the JobControl class in Hadoop MapReduce core. + * Once the location and interface of the class are more stable in CDH, this class + * should be removed completely and be based on the hadoop-core class. + */ +public class CrunchJobControl implements Runnable { + + // The thread can be in one of the following state + public static enum ThreadState { + RUNNING, SUSPENDED, STOPPED, STOPPING, READY + }; + + private ThreadState runnerState; // the thread state + + private Map waitingJobs; + private Map readyJobs; + private Map runningJobs; + private Map successfulJobs; + private Map failedJobs; + + private long nextJobID; + private String groupName; + + /** + * Construct a job control for a group of jobs. + * + * @param groupName + * a name identifying this group + */ + public CrunchJobControl(String groupName) { + this.waitingJobs = new Hashtable(); + this.readyJobs = new Hashtable(); + this.runningJobs = new Hashtable(); + this.successfulJobs = new Hashtable(); + this.failedJobs = new Hashtable(); + this.nextJobID = -1; + this.groupName = groupName; + this.runnerState = ThreadState.READY; + } + + private static List toList(Map jobs) { + ArrayList retv = new ArrayList(); + synchronized (jobs) { + for (CrunchControlledJob job : jobs.values()) { + retv.add(job); + } + } + return retv; + } + + /** + * @return the jobs in the waiting state + */ + public List getWaitingJobList() { + return toList(this.waitingJobs); + } + + /** + * @return the jobs in the running state + */ + public List getRunningJobList() { + return toList(this.runningJobs); + } + + /** + * @return the jobs in the ready state + */ + public List getReadyJobsList() { + return toList(this.readyJobs); + } + + /** + * @return the jobs in the success state + */ + public List getSuccessfulJobList() { + return toList(this.successfulJobs); + } + + public List getFailedJobList() { + return toList(this.failedJobs); + } + + private String getNextJobID() { + nextJobID += 1; + return this.groupName + this.nextJobID; + } + + private static void addToQueue(CrunchControlledJob aJob, + Map queue) { + synchronized (queue) { + queue.put(aJob.getJobID(), aJob); + } + } + + private void addToQueue(CrunchControlledJob aJob) { + Map queue = getQueue(aJob.getJobState()); + addToQueue(aJob, queue); + } + + private Map getQueue(State state) { + Map retv = null; + if (state == State.WAITING) { + retv = this.waitingJobs; + } else if (state == State.READY) { + retv = this.readyJobs; + } else if (state == State.RUNNING) { + retv = this.runningJobs; + } else if (state == State.SUCCESS) { + retv = this.successfulJobs; + } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) { + retv = this.failedJobs; + } + return retv; + } + + /** + * Add a new job. + * + * @param aJob + * the new job + */ + synchronized public String addJob(CrunchControlledJob aJob) { + String id = this.getNextJobID(); + aJob.setJobID(id); + aJob.setJobState(State.WAITING); + this.addToQueue(aJob); + return id; + } + + /** + * Add a collection of jobs + * + * @param jobs + */ + public void addJobCollection(Collection jobs) { + for (CrunchControlledJob job : jobs) { + addJob(job); + } + } + + /** + * @return the thread state + */ + public ThreadState getThreadState() { + return this.runnerState; + } + + /** + * set the thread state to STOPPING so that the thread will stop when it wakes + * up. + */ + public void stop() { + this.runnerState = ThreadState.STOPPING; + } + + /** + * suspend the running thread + */ + public void suspend() { + if (this.runnerState == ThreadState.RUNNING) { + this.runnerState = ThreadState.SUSPENDED; + } + } + + /** + * resume the suspended thread + */ + public void resume() { + if (this.runnerState == ThreadState.SUSPENDED) { + this.runnerState = ThreadState.RUNNING; + } + } + + synchronized private void checkRunningJobs() throws IOException, + InterruptedException { + + Map oldJobs = null; + oldJobs = this.runningJobs; + this.runningJobs = new Hashtable(); + + for (CrunchControlledJob nextJob : oldJobs.values()) { + nextJob.checkState(); + this.addToQueue(nextJob); + } + } + + synchronized private void checkWaitingJobs() throws IOException, + InterruptedException { + Map oldJobs = null; + oldJobs = this.waitingJobs; + this.waitingJobs = new Hashtable(); + + for (CrunchControlledJob nextJob : oldJobs.values()) { + nextJob.checkState(); + this.addToQueue(nextJob); + } + } + + synchronized private void startReadyJobs() { + Map oldJobs = null; + oldJobs = this.readyJobs; + this.readyJobs = new Hashtable(); + + for (CrunchControlledJob nextJob : oldJobs.values()) { + // Submitting Job to Hadoop + nextJob.submit(); + this.addToQueue(nextJob); + } + } + + synchronized public boolean allFinished() { + return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0 + && this.runningJobs.size() == 0; + } + + /** + * The main loop for the thread. The loop does the following: Check the states + * of the running jobs Update the states of waiting jobs Submit the jobs in + * ready state + */ + public void run() { + this.runnerState = ThreadState.RUNNING; + while (true) { + while (this.runnerState == ThreadState.SUSPENDED) { + try { + Thread.sleep(5000); + } catch (Exception e) { + + } + } + try { + checkRunningJobs(); + checkWaitingJobs(); + startReadyJobs(); + } catch (Exception e) { + this.runnerState = ThreadState.STOPPED; + } + if (this.runnerState != ThreadState.RUNNING + && this.runnerState != ThreadState.SUSPENDED) { + break; + } + try { + Thread.sleep(5000); + } catch (Exception e) { + + } + if (this.runnerState != ThreadState.RUNNING + && this.runnerState != ThreadState.SUSPENDED) { + break; + } + } + this.runnerState = ThreadState.STOPPED; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java new file mode 100644 index 0000000..10d033f --- /dev/null +++ b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java @@ -0,0 +1,473 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.util.ReflectionUtils; + +import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory; + +/** + * The MultipleOutputs class simplifies writing output data + * to multiple outputs + * + *

+ * Case one: writing to additional outputs other than the job default output. + * + * Each additional output, or named output, may be configured with its own + * OutputFormat, with its own key class and with its own value + * class. + * + *

+ * Case two: to write data to different files provided by user + *

+ * + *

+ * MultipleOutputs supports counters, by default they are disabled. The + * counters group is the {@link CrunchMultipleOutputs} class name. The names of the + * counters are the same as the output name. These count the number records + * written to each output name. + *

+ * + * Usage pattern for job submission: + *
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ * FileOutputFormat.setOutputPath(job, outDir);
+ *
+ * job.setMapperClass(MOMap.class);
+ * job.setReducerClass(MOReduce.class);
+ * ...
+ *
+ * // Defines additional single text based output 'text' for the job
+ * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
+ * LongWritable.class, Text.class);
+ *
+ * // Defines additional sequence-file based output 'sequence' for the job
+ * MultipleOutputs.addNamedOutput(job, "seq",
+ *   SequenceFileOutputFormat.class,
+ *   LongWritable.class, Text.class);
+ * ...
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * 
+ *

+ * Usage in Reducer: + *

+ *  String generateFileName(K k, V v) {
+ *   return k.toString() + "_" + v.toString();
+ * }
+ * 
+ * public class MOReduce extends
+ *   Reducer<WritableComparable, Writable,WritableComparable, Writable> {
+ * private MultipleOutputs mos;
+ * public void setup(Context context) {
+ * ...
+ * mos = new MultipleOutputs(context);
+ * }
+ *
+ * public void reduce(WritableComparable key, Iterator<Writable> values,
+ * Context context)
+ * throws IOException {
+ * ...
+ * mos.write("text", , key, new Text("Hello"));
+ * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
+ * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
+ * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
+ * ...
+ * }
+ *
+ * public void cleanup(Context) throws IOException {
+ * mos.close();
+ * ...
+ * }
+ *
+ * }
+ * 
+ */ +public class CrunchMultipleOutputs { + + private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs"; + + private static final String MO_PREFIX = + "mapreduce.multipleoutputs.namedOutput."; + + private static final String FORMAT = ".format"; + private static final String KEY = ".key"; + private static final String VALUE = ".value"; + private static final String COUNTERS_ENABLED = + "mapreduce.multipleoutputs.counters"; + + private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename"; + + /** + * Counters group used by the counters of MultipleOutputs. + */ + private static final String COUNTERS_GROUP = CrunchMultipleOutputs.class.getName(); + + /** + * Cache for the taskContexts + */ + private Map taskContexts = new HashMap(); + + /** + * Checks if a named output name is valid token. + * + * @param namedOutput named output Name + * @throws IllegalArgumentException if the output name is not valid. + */ + private static void checkTokenName(String namedOutput) { + if (namedOutput == null || namedOutput.length() == 0) { + throw new IllegalArgumentException( + "Name cannot be NULL or emtpy"); + } + for (char ch : namedOutput.toCharArray()) { + if ((ch >= 'A') && (ch <= 'Z')) { + continue; + } + if ((ch >= 'a') && (ch <= 'z')) { + continue; + } + if ((ch >= '0') && (ch <= '9')) { + continue; + } + throw new IllegalArgumentException( + "Name cannot be have a '" + ch + "' char"); + } + } + + /** + * Checks if output name is valid. + * + * name cannot be the name used for the default output + * @param outputPath base output Name + * @throws IllegalArgumentException if the output name is not valid. + */ + private static void checkBaseOutputPath(String outputPath) { + if (outputPath.equals(FileOutputFormat.PART)) { + throw new IllegalArgumentException("output name cannot be 'part'"); + } + } + + /** + * Checks if a named output name is valid. + * + * @param namedOutput named output Name + * @throws IllegalArgumentException if the output name is not valid. + */ + private static void checkNamedOutputName(JobContext job, + String namedOutput, boolean alreadyDefined) { + checkTokenName(namedOutput); + checkBaseOutputPath(namedOutput); + List definedChannels = getNamedOutputsList(job); + if (alreadyDefined && definedChannels.contains(namedOutput)) { + throw new IllegalArgumentException("Named output '" + namedOutput + + "' already alreadyDefined"); + } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) { + throw new IllegalArgumentException("Named output '" + namedOutput + + "' not defined"); + } + } + + // Returns list of channel names. + private static List getNamedOutputsList(JobContext job) { + List names = new ArrayList(); + StringTokenizer st = new StringTokenizer( + job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " "); + while (st.hasMoreTokens()) { + names.add(st.nextToken()); + } + return names; + } + + // Returns the named output OutputFormat. + @SuppressWarnings("unchecked") + private static Class> getNamedOutputFormatClass( + JobContext job, String namedOutput) { + return (Class>) + job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null, + OutputFormat.class); + } + + // Returns the key class for a named output. + private static Class getNamedOutputKeyClass(JobContext job, + String namedOutput) { + return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null, + Object.class); + } + + // Returns the value class for a named output. + private static Class getNamedOutputValueClass( + JobContext job, String namedOutput) { + return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE, + null, Object.class); + } + + /** + * Adds a named output for the job. + *

+ * + * @param job job to add the named output + * @param namedOutput named output name, it has to be a word, letters + * and numbers only, cannot be the word 'part' as + * that is reserved for the default output. + * @param outputFormatClass OutputFormat class. + * @param keyClass key class + * @param valueClass value class + */ + public static void addNamedOutput(Job job, String namedOutput, + Class outputFormatClass, + Class keyClass, Class valueClass) { + checkNamedOutputName(job, namedOutput, true); + Configuration conf = job.getConfiguration(); + conf.set(MULTIPLE_OUTPUTS, + conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput); + conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, + OutputFormat.class); + conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class); + conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class); + } + + /** + * Enables or disables counters for the named outputs. + * + * The counters group is the {@link CrunchMultipleOutputs} class name. + * The names of the counters are the same as the named outputs. These + * counters count the number records written to each output name. + * By default these counters are disabled. + * + * @param job job to enable counters + * @param enabled indicates if the counters will be enabled or not. + */ + public static void setCountersEnabled(Job job, boolean enabled) { + job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled); + } + + /** + * Returns if the counters for the named outputs are enabled or not. + * By default these counters are disabled. + * + * @param job the job + * @return TRUE if the counters are enabled, FALSE if they are disabled. + */ + public static boolean getCountersEnabled(JobContext job) { + return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false); + } + + /** + * Wraps RecordWriter to increment counters. + */ + @SuppressWarnings("unchecked") + private static class RecordWriterWithCounter extends RecordWriter { + private RecordWriter writer; + private String counterName; + private TaskInputOutputContext context; + + public RecordWriterWithCounter(RecordWriter writer, String counterName, + TaskInputOutputContext context) { + this.writer = writer; + this.counterName = counterName; + this.context = context; + } + + @SuppressWarnings({"unchecked"}) + public void write(Object key, Object value) + throws IOException, InterruptedException { + context.getCounter(COUNTERS_GROUP, counterName).increment(1); + writer.write(key, value); + } + + public void close(TaskAttemptContext context) + throws IOException, InterruptedException { + writer.close(context); + } + } + + // instance code, to be used from Mapper/Reducer code + + private TaskInputOutputContext context; + private Set namedOutputs; + private Map> recordWriters; + private boolean countersEnabled; + + /** + * Creates and initializes multiple outputs support, + * it should be instantiated in the Mapper/Reducer setup method. + * + * @param context the TaskInputOutputContext object + */ + public CrunchMultipleOutputs( + TaskInputOutputContext context) { + this.context = context; + namedOutputs = Collections.unmodifiableSet( + new HashSet(CrunchMultipleOutputs.getNamedOutputsList(context))); + recordWriters = new HashMap>(); + countersEnabled = getCountersEnabled(context); + } + + /** + * Write key and value to the namedOutput. + * + * Output path is a unique file generated for the namedOutput. + * For example, {namedOutput}-(m|r)-{part-number} + * + * @param namedOutput the named output name + * @param key the key + * @param value the value + */ + @SuppressWarnings("unchecked") + public void write(String namedOutput, K key, V value) + throws IOException, InterruptedException { + write(namedOutput, key, value, namedOutput); + } + + /** + * Write key and value to baseOutputPath using the namedOutput. + * + * @param namedOutput the named output name + * @param key the key + * @param value the value + * @param baseOutputPath base-output path to write the record to. + * Note: Framework will generate unique filename for the baseOutputPath + */ + @SuppressWarnings("unchecked") + public void write(String namedOutput, K key, V value, + String baseOutputPath) throws IOException, InterruptedException { + checkNamedOutputName(context, namedOutput, false); + checkBaseOutputPath(baseOutputPath); + if (!namedOutputs.contains(namedOutput)) { + throw new IllegalArgumentException("Undefined named output '" + + namedOutput + "'"); + } + TaskAttemptContext taskContext = getContext(namedOutput); + getRecordWriter(taskContext, baseOutputPath).write(key, value); + } + + /** + * Write key value to an output file name. + * + * Gets the record writer from job's output format. + * Job's output format should be a FileOutputFormat. + * + * @param key the key + * @param value the value + * @param baseOutputPath base-output path to write the record to. + * Note: Framework will generate unique filename for the baseOutputPath + */ + @SuppressWarnings("unchecked") + public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) + throws IOException, InterruptedException { + checkBaseOutputPath(baseOutputPath); + TaskAttemptContext taskContext = TaskAttemptContextFactory.create( + context.getConfiguration(), context.getTaskAttemptID()); + getRecordWriter(taskContext, baseOutputPath).write(key, value); + } + + // by being synchronized MultipleOutputTask can be use with a + // MultithreadedMapper. + @SuppressWarnings("unchecked") + private synchronized RecordWriter getRecordWriter( + TaskAttemptContext taskContext, String baseFileName) + throws IOException, InterruptedException { + + // look for record-writer in the cache + RecordWriter writer = recordWriters.get(baseFileName); + + // If not in cache, create a new one + if (writer == null) { + // get the record writer from context output format + taskContext.getConfiguration().set(BASE_OUTPUT_NAME, baseFileName); + try { + writer = ((OutputFormat) ReflectionUtils.newInstance( + taskContext.getOutputFormatClass(), taskContext.getConfiguration())) + .getRecordWriter(taskContext); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + + // if counters are enabled, wrap the writer with context + // to increment counters + if (countersEnabled) { + writer = new RecordWriterWithCounter(writer, baseFileName, context); + } + + // add the record-writer to the cache + recordWriters.put(baseFileName, writer); + } + return writer; + } + + // Create a taskAttemptContext for the named output with + // output format and output key/value types put in the context + private TaskAttemptContext getContext(String nameOutput) throws IOException { + + TaskAttemptContext taskContext = taskContexts.get(nameOutput); + + if (taskContext != null) { + return taskContext; + } + + // The following trick leverages the instantiation of a record writer via + // the job thus supporting arbitrary output formats. + Job job = new Job(context.getConfiguration()); + job.getConfiguration().set("crunch.namedoutput", nameOutput); + job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput)); + job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput)); + job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput)); + taskContext = TaskAttemptContextFactory.create( + job.getConfiguration(), context.getTaskAttemptID()); + + taskContexts.put(nameOutput, taskContext); + + return taskContext; + } + + /** + * Closes all the opened outputs. + * + * This should be called from cleanup method of map/reduce task. + * If overridden subclasses must invoke super.close() at the + * end of their close() + * + */ + @SuppressWarnings("unchecked") + public void close() throws IOException, InterruptedException { + for (RecordWriter writer : recordWriters.values()) { + writer.close(context); + } + } +}