crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: Error while trying to obtain the top elements.
Date Wed, 08 Jul 2015 12:41:52 GMT
Hi Florin,

The use of Avros that David and I were referring to basically just involves
changing your import statements in your original source file from:

  import static org.apache.crunch.types.writable.Writables.ints;
  import static org.apache.crunch.types.writable.Writables.strings;
  import static org.apache.crunch.types.writable.Writables.tableOf;

to:

  import static org.apache.crunch.types.avro.Avros.ints;
  import static org.apache.crunch.types.avro.Avros.strings;
  import static org.apache.crunch.types.avro.Avros.tableOf;


The specific problem that you seem to be running into now is due to Hadoop
2.7.0 still shipping with avro 1.7.4, in which the underlying Avro bugs
that contributed to CRUNCH-360 are still present. You should be able to get
around that by swapping out the avro jar files
under share/hadoop/common/lib in your Hadoop installation (as mentioned in
CRUNCH-360).

About applying the patch on CRUNCH-539, you can do it as follows:
1. Download the patch to your local system
2. Clone the Crunch git repo
    $ git clone https://git-wip-us.apache.org/repos/asf/crunch.git

3. Apply the patch with git
    $ git am path/to/CRUNCH-539.patch

4. Rebuild Crunch
    $ mvn clean install -DskipTests

5. Update the pom in your own project to refer to crunch 0.13-SNAPSHOT, and
rebuild your own project


- Gabriel




On Wed, Jul 8, 2015 at 11:22 AM Florin Tatu <tatuflorin@gmail.com> wrote:

> Hi David, Gabriel,
>
> Using Avros I get  org.apache.avro.UnresolvedUnionException: Not in union
>
> The situation is similar to the crunch example from the hadoop book(first
> make a clone:
> git clone https://github.com/tomwhite/hadoop-book.git)
> If you run on hadoop-book/ch18-crunch the following commands:
>
> mvn package
> hadoop jar target/ch18-crunch-4.0-job.jar
> crunch.AvroGenericMaxTemperatureCrunch ncdc_data output
>
> you obtain the following exception:
>
> java.lang.Exception: org.apache.avro.UnresolvedUnionException: Not in
> union
> [{"type":"record","name":"WeatherRecord","namespace":"org.apache.avro.mapred","doc":"A
> weather
> reading.","fields":[{"name":"year","type":"int"},{"name":"temperature","type":"int"},{"name":"stationId","type":"string"}]},"null"]:
> {"year": 1928, "temperature": 28, "stationId": "011060-99999"}
> at
> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
> at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
> Caused by: org.apache.avro.UnresolvedUnionException: Not in union
> [{"type":"record","name":"WeatherRecord","namespace":"org.apache.avro.mapred","doc":"A
> weather
> reading.","fields":[{"name":"year","type":"int"},{"name":"temperature","type":"int"},{"name":"stationId","type":"string"}]},"null"]:
> {"year": 1928, "temperature": 28, "stationId": "011060-99999"}
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:561)
> at
> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:144)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
> at
> org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:128)
> at
> org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:113)
> at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1157)
> at
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
> at
> org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
> at
> org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
> at org.apache.crunch.impl.mr.emit.OutputEmitter.emit(OutputEmitter.java:41)
> at org.apache.crunch.MapFn.process(MapFn.java:34)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
> at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> at
> crunch.AvroGenericMaxTemperatureCrunch$2.process(AvroGenericMaxTemperatureCrunch.java:95)
> at
> crunch.AvroGenericMaxTemperatureCrunch$2.process(AvroGenericMaxTemperatureCrunch.java:82)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
> at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> at org.apache.crunch.MapFn.process(MapFn.java:34)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109)
> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
> at
> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> As far as I researched it seems to be related with:
> https://issues.apache.org/jira/browse/CRUNCH-360
> (but please correct me if I am wrong).
>
> If this is a known issue, is there another alternative?
>
> PS: @Gabriel Reid: I saw you made a patch for the issue
> https://issues.apache.org/jira/browse/CRUNCH-539
> I am unfamiliar on how to apply it to the project, can you please provide
> me some minimal guidance? :)
>
>
> - Florin
>
> On Tue, Jul 7, 2015 at 11:45 PM, Gabriel Reid <gabriel.reid@gmail.com>
> wrote:
>
>> Hi Florin,
>>
>> Thanks for the very detailed report. That appears to be a bug, brought on
>> by the way that ObjectInputStream works with classloaders, together with
>> how Hadoop manipulates classloaders.
>>
>> I've logged a JIRA ticket [1] for this. For now, like David I would
>> recommend using Avros instead of Writables, as that should get around this
>> issue without having any other consequences for now.
>>
>> - Gabriel
>>
>> 1. https://issues.apache.org/jira/browse/CRUNCH-539
>>
>>
>> On Tue, Jul 7, 2015 at 3:27 PM David Ortiz <dpo5003@gmail.com> wrote:
>>
>>> That looks weird.  Can you try it using Avros in place of Writables and
>>> see if it does the same thing?
>>>
>>> On Tue, Jul 7, 2015, 3:43 AM Florin Tatu <tatuflorin@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am having a job that processes a set of files containing climatic
>>>> data(more exactly data from this location:
>>>> ftp://ftp.ncdc.noaa.gov/pub/data/noaa/)
>>>>
>>>> I downloaded and merged the data using a script so I will have one
>>>> folder (ncdc_data) having a .gz archive for each year(eg: 1901.gz, 1902.gz
>>>> etc)
>>>> Each archive contains only one text file.
>>>>
>>>> My code is:
>>>>
>>>> import com.google.common.base.Charsets;
>>>> import com.google.common.io.Files;
>>>> import org.apache.crunch.*;
>>>> import org.apache.crunch.fn.Aggregators;
>>>> import org.apache.crunch.impl.mr.MRPipeline;
>>>> import org.apache.crunch.io.To;
>>>> import java.io.File;
>>>> import static org.apache.crunch.types.writable.Writables.ints;
>>>> import static org.apache.crunch.types.writable.Writables.strings;
>>>> import static org.apache.crunch.types.writable.Writables.tableOf;
>>>>
>>>> public class MaxTemperatureCrunch {
>>>>
>>>>     public static void main(String[] args) throws Exception {
>>>>         if (args.length != 2) {
>>>>             System.err.println("Usage: MaxTemperatureCrunch <input
>>>> path> <output path>");
>>>>             System.exit(-1);
>>>>         }
>>>>
>>>>         Pipeline pipeline = new MRPipeline(MaxTemperatureCrunch.class);
>>>>
>>>>         PCollection<String> records = pipeline.readTextFile(args[0]);
>>>>
>>>>         PTable<String, Integer> yearTemperatures = records
>>>>                 .parallelDo(toYearTempPairsFn(), tableOf(strings(),
>>>> ints()));
>>>>
>>>>         PTable<String, Integer> maxTemps = yearTemperatures
>>>>                 .groupByKey()
>>>>                 .combineValues(Aggregators.MAX_INTS())
>>>>                 .top(1);   //LINE THAT CAUSES THE ERROR
>>>>
>>>>         maxTemps.write(To.textFile(args[1]));
>>>>
>>>>         PipelineResult result = pipeline.done();
>>>>         String dot =
>>>> pipeline.getConfiguration().get("crunch.planner.dotfile");
>>>>         Files.write(dot, new File("pipeline.dot"), Charsets.UTF_8);
>>>>         Runtime.getRuntime().exec("dot -Tpng -O pipeline.dot");
>>>>         System.exit(result.succeeded() ? 0 : 1);
>>>>     }
>>>>
>>>>     static DoFn<String, Pair<String, Integer>> toYearTempPairsFn()
{
>>>>         return new DoFn<String, Pair<String, Integer>>() {
>>>>             NcdcRecordParser parser = new NcdcRecordParser();
>>>>             @Override
>>>>             public void process(String input, Emitter<Pair<String,
>>>> Integer>> emitter) {
>>>>                 parser.parse(input);
>>>>                 if (parser.isValidTemperature()) {
>>>>                     emitter.emit(Pair.of(parser.getYear(),
>>>> parser.getAirTemperature()));
>>>>                 }
>>>>             }
>>>>         };
>>>>     }
>>>> }
>>>>
>>>>
>>>> Hadoop runs locally in standalone mode.
>>>> Hadoop version is: 2.7.0
>>>> Crunch version is: 0.12.0  (maven dependency: 0.12.0-hadoop2)
>>>>
>>>> I build my application with: mvn package
>>>> I run it with: hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar
>>>> ncdc_data/ output
>>>>
>>>> If I do not call .top(1) (see comment:  //LINE THAT CAUSES THE ERROR)
>>>> everything works fine, but I obtain the maximum temperatures for each year
>>>> only and I want to obtain the overall maximum temperature or the top N
>>>> temperatures for the whole data set.
>>>>
>>>> If I call .top(1) I obtain the following error:
>>>>
>>>> java.lang.Exception: org.apache.crunch.CrunchRuntimeException: Error
>>>> reloading writable comparable codes
>>>> at
>>>> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
>>>> at
>>>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
>>>> Caused by: org.apache.crunch.CrunchRuntimeException: Error reloading
>>>> writable comparable codes
>>>> at
>>>> org.apache.crunch.types.writable.TupleWritable.setConf(TupleWritable.java:71)
>>>> at
>>>> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
>>>> at
>>>> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
>>>> at
>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66)
>>>> at
>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
>>>> at
>>>> org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:2247)
>>>> at
>>>> org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2220)
>>>> at
>>>> org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.nextKeyValue(SequenceFileRecordReader.java:78)
>>>> at
>>>> org.apache.crunch.impl.mr.run.CrunchRecordReader.nextKeyValue(CrunchRecordReader.java:157)
>>>> at
>>>> org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
>>>> at
>>>> org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
>>>> at
>>>> org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
>>>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
>>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
>>>> at
>>>> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> org.apache.crunch.types.writable.TupleWritable
>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Class.java:274)
>>>> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>>>> at
>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>>>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>>>> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1333)
>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>> at
>>>> com.google.common.collect.Serialization.populateMap(Serialization.java:91)
>>>> at com.google.common.collect.HashBiMap.readObject(HashBiMap.java:109)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>> at
>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>> at
>>>> org.apache.crunch.types.writable.Writables.reloadWritableComparableCodes(Writables.java:145)
>>>> at
>>>> org.apache.crunch.types.writable.TupleWritable.setConf(TupleWritable.java:69)
>>>> ... 20 more
>>>>
>>>>
>>>> Did anyone encountered this issue?
>>>> If you need any other details please ask me.
>>>>
>>>> Thank you,
>>>> Florin
>>>>
>>>
>

Mime
View raw message