crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Florin Tatu <tatuflo...@gmail.com>
Subject Re: Error while trying to obtain the top elements.
Date Thu, 09 Jul 2015 04:20:30 GMT
Hi Gabriel, David,

My mistake here. I thought I should use an Avro schema. I am pretty new to
Hadoop and Crunch.

The Avro types from Crunch works.

I also applied the patch by following the steps you provided and it works.

For the moment as you recommended I will stick to the Avro types until the
fix will be released.

Thank you for the support and the guidance,
Florin

On Wed, Jul 8, 2015 at 7:41 PM, Gabriel Reid <gabriel.reid@gmail.com> wrote:

> Hi Florin,
>
> The use of Avros that David and I were referring to basically just
> involves changing your import statements in your original source file from:
>
>   import static org.apache.crunch.types.writable.Writables.ints;
>   import static org.apache.crunch.types.writable.Writables.strings;
>   import static org.apache.crunch.types.writable.Writables.tableOf;
>
> to:
>
>   import static org.apache.crunch.types.avro.Avros.ints;
>   import static org.apache.crunch.types.avro.Avros.strings;
>   import static org.apache.crunch.types.avro.Avros.tableOf;
>
>
> The specific problem that you seem to be running into now is due to Hadoop
> 2.7.0 still shipping with avro 1.7.4, in which the underlying Avro bugs
> that contributed to CRUNCH-360 are still present. You should be able to get
> around that by swapping out the avro jar files
> under share/hadoop/common/lib in your Hadoop installation (as mentioned in
> CRUNCH-360).
>
> About applying the patch on CRUNCH-539, you can do it as follows:
> 1. Download the patch to your local system
> 2. Clone the Crunch git repo
>     $ git clone https://git-wip-us.apache.org/repos/asf/crunch.git
>
> 3. Apply the patch with git
>     $ git am path/to/CRUNCH-539.patch
>
> 4. Rebuild Crunch
>     $ mvn clean install -DskipTests
>
> 5. Update the pom in your own project to refer to crunch 0.13-SNAPSHOT,
> and rebuild your own project
>
>
> - Gabriel
>
>
>
>
> On Wed, Jul 8, 2015 at 11:22 AM Florin Tatu <tatuflorin@gmail.com> wrote:
>
>> Hi David, Gabriel,
>>
>> Using Avros I get  org.apache.avro.UnresolvedUnionException: Not in union
>>
>> The situation is similar to the crunch example from the hadoop book(first
>> make a clone:
>> git clone https://github.com/tomwhite/hadoop-book.git)
>> If you run on hadoop-book/ch18-crunch the following commands:
>>
>> mvn package
>> hadoop jar target/ch18-crunch-4.0-job.jar
>> crunch.AvroGenericMaxTemperatureCrunch ncdc_data output
>>
>> you obtain the following exception:
>>
>> java.lang.Exception: org.apache.avro.UnresolvedUnionException: Not in
>> union
>> [{"type":"record","name":"WeatherRecord","namespace":"org.apache.avro.mapred","doc":"A
>> weather
>> reading.","fields":[{"name":"year","type":"int"},{"name":"temperature","type":"int"},{"name":"stationId","type":"string"}]},"null"]:
>> {"year": 1928, "temperature": 28, "stationId": "011060-99999"}
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
>> Caused by: org.apache.avro.UnresolvedUnionException: Not in union
>> [{"type":"record","name":"WeatherRecord","namespace":"org.apache.avro.mapred","doc":"A
>> weather
>> reading.","fields":[{"name":"year","type":"int"},{"name":"temperature","type":"int"},{"name":"stationId","type":"string"}]},"null"]:
>> {"year": 1928, "temperature": 28, "stationId": "011060-99999"}
>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:561)
>> at
>> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:144)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
>> at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
>> at
>> org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:128)
>> at
>> org.apache.crunch.types.avro.SafeAvroSerialization$AvroWrapperSerializer.serialize(SafeAvroSerialization.java:113)
>> at
>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1157)
>> at
>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
>> at
>> org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
>> at
>> org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
>> at
>> org.apache.crunch.impl.mr.emit.OutputEmitter.emit(OutputEmitter.java:41)
>> at org.apache.crunch.MapFn.process(MapFn.java:34)
>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>> at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at
>> crunch.AvroGenericMaxTemperatureCrunch$2.process(AvroGenericMaxTemperatureCrunch.java:95)
>> at
>> crunch.AvroGenericMaxTemperatureCrunch$2.process(AvroGenericMaxTemperatureCrunch.java:82)
>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>> at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at org.apache.crunch.MapFn.process(MapFn.java:34)
>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109)
>> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
>> at
>> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> As far as I researched it seems to be related with:
>> https://issues.apache.org/jira/browse/CRUNCH-360
>> (but please correct me if I am wrong).
>>
>> If this is a known issue, is there another alternative?
>>
>> PS: @Gabriel Reid: I saw you made a patch for the issue
>> https://issues.apache.org/jira/browse/CRUNCH-539
>> I am unfamiliar on how to apply it to the project, can you please provide
>> me some minimal guidance? :)
>>
>>
>> - Florin
>>
>> On Tue, Jul 7, 2015 at 11:45 PM, Gabriel Reid <gabriel.reid@gmail.com>
>> wrote:
>>
>>> Hi Florin,
>>>
>>> Thanks for the very detailed report. That appears to be a bug, brought
>>> on by the way that ObjectInputStream works with classloaders, together with
>>> how Hadoop manipulates classloaders.
>>>
>>> I've logged a JIRA ticket [1] for this. For now, like David I would
>>> recommend using Avros instead of Writables, as that should get around this
>>> issue without having any other consequences for now.
>>>
>>> - Gabriel
>>>
>>> 1. https://issues.apache.org/jira/browse/CRUNCH-539
>>>
>>>
>>> On Tue, Jul 7, 2015 at 3:27 PM David Ortiz <dpo5003@gmail.com> wrote:
>>>
>>>> That looks weird.  Can you try it using Avros in place of Writables and
>>>> see if it does the same thing?
>>>>
>>>> On Tue, Jul 7, 2015, 3:43 AM Florin Tatu <tatuflorin@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am having a job that processes a set of files containing climatic
>>>>> data(more exactly data from this location:
>>>>> ftp://ftp.ncdc.noaa.gov/pub/data/noaa/)
>>>>>
>>>>> I downloaded and merged the data using a script so I will have one
>>>>> folder (ncdc_data) having a .gz archive for each year(eg: 1901.gz, 1902.gz
>>>>> etc)
>>>>> Each archive contains only one text file.
>>>>>
>>>>> My code is:
>>>>>
>>>>> import com.google.common.base.Charsets;
>>>>> import com.google.common.io.Files;
>>>>> import org.apache.crunch.*;
>>>>> import org.apache.crunch.fn.Aggregators;
>>>>> import org.apache.crunch.impl.mr.MRPipeline;
>>>>> import org.apache.crunch.io.To;
>>>>> import java.io.File;
>>>>> import static org.apache.crunch.types.writable.Writables.ints;
>>>>> import static org.apache.crunch.types.writable.Writables.strings;
>>>>> import static org.apache.crunch.types.writable.Writables.tableOf;
>>>>>
>>>>> public class MaxTemperatureCrunch {
>>>>>
>>>>>     public static void main(String[] args) throws Exception {
>>>>>         if (args.length != 2) {
>>>>>             System.err.println("Usage: MaxTemperatureCrunch <input
>>>>> path> <output path>");
>>>>>             System.exit(-1);
>>>>>         }
>>>>>
>>>>>         Pipeline pipeline = new MRPipeline(MaxTemperatureCrunch.class);
>>>>>
>>>>>         PCollection<String> records = pipeline.readTextFile(args[0]);
>>>>>
>>>>>         PTable<String, Integer> yearTemperatures = records
>>>>>                 .parallelDo(toYearTempPairsFn(), tableOf(strings(),
>>>>> ints()));
>>>>>
>>>>>         PTable<String, Integer> maxTemps = yearTemperatures
>>>>>                 .groupByKey()
>>>>>                 .combineValues(Aggregators.MAX_INTS())
>>>>>                 .top(1);   //LINE THAT CAUSES THE ERROR
>>>>>
>>>>>         maxTemps.write(To.textFile(args[1]));
>>>>>
>>>>>         PipelineResult result = pipeline.done();
>>>>>         String dot =
>>>>> pipeline.getConfiguration().get("crunch.planner.dotfile");
>>>>>         Files.write(dot, new File("pipeline.dot"), Charsets.UTF_8);
>>>>>         Runtime.getRuntime().exec("dot -Tpng -O pipeline.dot");
>>>>>         System.exit(result.succeeded() ? 0 : 1);
>>>>>     }
>>>>>
>>>>>     static DoFn<String, Pair<String, Integer>> toYearTempPairsFn()
{
>>>>>         return new DoFn<String, Pair<String, Integer>>()
{
>>>>>             NcdcRecordParser parser = new NcdcRecordParser();
>>>>>             @Override
>>>>>             public void process(String input, Emitter<Pair<String,
>>>>> Integer>> emitter) {
>>>>>                 parser.parse(input);
>>>>>                 if (parser.isValidTemperature()) {
>>>>>                     emitter.emit(Pair.of(parser.getYear(),
>>>>> parser.getAirTemperature()));
>>>>>                 }
>>>>>             }
>>>>>         };
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> Hadoop runs locally in standalone mode.
>>>>> Hadoop version is: 2.7.0
>>>>> Crunch version is: 0.12.0  (maven dependency: 0.12.0-hadoop2)
>>>>>
>>>>> I build my application with: mvn package
>>>>> I run it with: hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar
>>>>> ncdc_data/ output
>>>>>
>>>>> If I do not call .top(1) (see comment:  //LINE THAT CAUSES THE ERROR)
>>>>> everything works fine, but I obtain the maximum temperatures for each
year
>>>>> only and I want to obtain the overall maximum temperature or the top
N
>>>>> temperatures for the whole data set.
>>>>>
>>>>> If I call .top(1) I obtain the following error:
>>>>>
>>>>> java.lang.Exception: org.apache.crunch.CrunchRuntimeException: Error
>>>>> reloading writable comparable codes
>>>>> at
>>>>> org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
>>>>> at
>>>>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
>>>>> Caused by: org.apache.crunch.CrunchRuntimeException: Error reloading
>>>>> writable comparable codes
>>>>> at
>>>>> org.apache.crunch.types.writable.TupleWritable.setConf(TupleWritable.java:71)
>>>>> at
>>>>> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
>>>>> at
>>>>> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
>>>>> at
>>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:66)
>>>>> at
>>>>> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
>>>>> at
>>>>> org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:2247)
>>>>> at
>>>>> org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2220)
>>>>> at
>>>>> org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.nextKeyValue(SequenceFileRecordReader.java:78)
>>>>> at
>>>>> org.apache.crunch.impl.mr.run.CrunchRecordReader.nextKeyValue(CrunchRecordReader.java:157)
>>>>> at
>>>>> org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556)
>>>>> at
>>>>> org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
>>>>> at
>>>>> org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
>>>>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>>>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
>>>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
>>>>> at
>>>>> org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> org.apache.crunch.types.writable.TupleWritable
>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>>>> at java.lang.Class.forName0(Native Method)
>>>>> at java.lang.Class.forName(Class.java:274)
>>>>> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>>>>> at
>>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>>>>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>>>>> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483)
>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1333)
>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>> at
>>>>> com.google.common.collect.Serialization.populateMap(Serialization.java:91)
>>>>> at com.google.common.collect.HashBiMap.readObject(HashBiMap.java:109)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>>> at
>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>>> at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>> at
>>>>> org.apache.crunch.types.writable.Writables.reloadWritableComparableCodes(Writables.java:145)
>>>>> at
>>>>> org.apache.crunch.types.writable.TupleWritable.setConf(TupleWritable.java:69)
>>>>> ... 20 more
>>>>>
>>>>>
>>>>> Did anyone encountered this issue?
>>>>> If you need any other details please ask me.
>>>>>
>>>>> Thank you,
>>>>> Florin
>>>>>
>>>>
>>

Mime
View raw message