flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: starting flink job from bash script with maven
Date Fri, 24 Jul 2015 09:09:23 GMT
Hi!

There is probably something going wrong in MongoOutputFormat or
MongoHadoop2OutputFormat.
Something fails, but Java swallows the problem during Serialization.

It may be a classloading issue that gets not reported. Are the
MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar? If
not, try putting them in there.

The last check we could to (to validate the Flink Serialization utilities)
is the code pasted below. If that does not cause the error, it is probably
the issue described above.

Greetings,
Stephan


------------------------------

UserCodeObjectWrapper<Object> userCode = new
UserCodeObjectWrapper<Object>(new MongoHadoop2OutputFormat<>(new
MongoOutputFormat<>(), Job.getInstance()));
Configuration cfg = new Configuration();
TaskConfig taskConfig = new TaskConfig(cfg);
taskConfig.setStubWrapper(userCode);
taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader());



On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli <s.bortoli@gmail.com>
wrote:

> I have implemented this test without any exception:
>
> package org.tagcloud.persistence.batch.test;
>
> import java.io.IOException;
>
> import org.apache.commons.lang.SerializationUtils;
> import org.apache.hadoop.mapreduce.Job;
> import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat;
>
> import com.mongodb.hadoop.MongoOutputFormat;
>
> public class MongoHadoopSerializationTest {
>
>     public static void main(String[] args) {
>         Job job;
>         try {
>             job = Job.getInstance();
>             SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new
> MongoOutputFormat<>(), job));
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
>
>     }
>
> }
>
> 2015-07-24 10:01 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>
>> Hi!
>>
>> The user code object (the output format here) has a corrupt serialization
>> routine.
>>
>> We use default Java Serialization for these objects. Either the MongoHadoopOutputFormat
>> cannot be serialized and swallows an exception, or it overrides the
>> readObject() / writeObject() methods (from Java Serialization) in an
>> inconsistent way.
>>
>> To figure that out, can you try whether you can manually serialize the
>> MongoHadoopOutputFormat?
>>
>> Can you try and call "SerializationUtils.clone(new
>> MongoHadoopOutputFormat)", for example at the beginning of your main
>> method? The SerializationUtils are part of Apache Commons and are probably
>> in your class path anyways.
>>
>> Stephan
>>
>>
>> On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <bortoli@okkam.it>
>> wrote:
>>
>>> Hi guys!
>>>
>>> I could program a data maintenance job using Flink on MongoDB. The job
>>> runs smoothly if I start it from eclipse. However, when I try to run it
>>> using a bash script invoking a maven exec:java I have a serialization
>>> exception:
>>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize
>>> task 'DataSink
>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)':
>>> Deserializing the OutputFormat
>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)
>>> failed: Could not read the user code wrapper: unexpected block data
>>>     at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>>>     at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>>
>>> attached the complete stack trace. I thought it was a matter of
>>> serializable classes, so I have made all my classes serializable.. still I
>>> have the same error. Perhaps it is not possible to do these things with
>>> Flink.
>>>
>>> any intuition? is it doable?
>>>
>>> thanks a lot for your support. :-)
>>>
>>> saluti,
>>>
>>> Stefano Bortoli, PhD
>>>
>>> *ENS Technical Director *_______________________________________________
>>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>>
>>> *Email:* bortoli@okkam.it
>>>
>>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>>>
>>> *Headquarters:* Trento (Italy), Via Trener 8
>>> *Registered office:* Trento (Italy), via Segantini 23
>>>
>>> Confidentially notice. This e-mail transmission may contain legally
>>> privileged and/or confidential information. Please do not read it if you
>>> are not the intended recipient(S). Any use, distribution, reproduction or
>>> disclosure by any other person is strictly prohibited. If you have received
>>> this e-mail in error, please notify the sender and destroy the original
>>> transmission and its attachments without reading or saving it in any manner.
>>>
>>>
>>
>

Mime
View raw message