flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <s.bort...@gmail.com>
Subject Re: starting flink job from bash script with maven
Date Fri, 24 Jul 2015 09:17:02 GMT
HI Stephan,

I think I may have found a possible root of the problem. I do not build the
fat jar, I simply execute the main with maven exec:java with default
install and compile. No uberjar created shading. I will try that and
report. The fact that it runs in eclipse so easily makes it confusing
somehow.

saluti,
Stefano

2015-07-24 11:09 GMT+02:00 Stephan Ewen <sewen@apache.org>:

> Hi!
>
> There is probably something going wrong in MongoOutputFormat or MongoHadoop2OutputFormat.
> Something fails, but Java swallows the problem during Serialization.
>
> It may be a classloading issue that gets not reported. Are the
> MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar?
> If not, try putting them in there.
>
> The last check we could to (to validate the Flink Serialization utilities)
> is the code pasted below. If that does not cause the error, it is probably
> the issue described above.
>
> Greetings,
> Stephan
>
>
> ------------------------------
>
> UserCodeObjectWrapper<Object> userCode = new
> UserCodeObjectWrapper<Object>(new MongoHadoop2OutputFormat<>(new
> MongoOutputFormat<>(), Job.getInstance()));
> Configuration cfg = new Configuration();
> TaskConfig taskConfig = new TaskConfig(cfg);
> taskConfig.setStubWrapper(userCode);
> taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader());
>
>
>
> On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli <s.bortoli@gmail.com>
> wrote:
>
>> I have implemented this test without any exception:
>>
>> package org.tagcloud.persistence.batch.test;
>>
>> import java.io.IOException;
>>
>> import org.apache.commons.lang.SerializationUtils;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat;
>>
>> import com.mongodb.hadoop.MongoOutputFormat;
>>
>> public class MongoHadoopSerializationTest {
>>
>>     public static void main(String[] args) {
>>         Job job;
>>         try {
>>             job = Job.getInstance();
>>             SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new
>> MongoOutputFormat<>(), job));
>>         } catch (IOException e) {
>>             e.printStackTrace();
>>         }
>>
>>     }
>>
>> }
>>
>> 2015-07-24 10:01 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>>
>>> Hi!
>>>
>>> The user code object (the output format here) has a corrupt
>>> serialization routine.
>>>
>>> We use default Java Serialization for these objects. Either the MongoHadoopOutputFormat
>>> cannot be serialized and swallows an exception, or it overrides the
>>> readObject() / writeObject() methods (from Java Serialization) in an
>>> inconsistent way.
>>>
>>> To figure that out, can you try whether you can manually serialize the
>>> MongoHadoopOutputFormat?
>>>
>>> Can you try and call "SerializationUtils.clone(new
>>> MongoHadoopOutputFormat)", for example at the beginning of your main
>>> method? The SerializationUtils are part of Apache Commons and are probably
>>> in your class path anyways.
>>>
>>> Stephan
>>>
>>>
>>> On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <bortoli@okkam.it>
>>> wrote:
>>>
>>>> Hi guys!
>>>>
>>>> I could program a data maintenance job using Flink on MongoDB. The job
>>>> runs smoothly if I start it from eclipse. However, when I try to run it
>>>> using a bash script invoking a maven exec:java I have a serialization
>>>> exception:
>>>> org.apache.flink.runtime.client.JobExecutionException: Cannot
>>>> initialize task 'DataSink
>>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)':
>>>> Deserializing the OutputFormat
>>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)
>>>> failed: Could not read the user code wrapper: unexpected block data
>>>>     at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>>>>     at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>>>
>>>> attached the complete stack trace. I thought it was a matter of
>>>> serializable classes, so I have made all my classes serializable.. still
I
>>>> have the same error. Perhaps it is not possible to do these things with
>>>> Flink.
>>>>
>>>> any intuition? is it doable?
>>>>
>>>> thanks a lot for your support. :-)
>>>>
>>>> saluti,
>>>>
>>>> Stefano Bortoli, PhD
>>>>
>>>> *ENS Technical Director *
>>>> _______________________________________________
>>>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>>>
>>>> *Email:* bortoli@okkam.it
>>>>
>>>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>>>>
>>>> *Headquarters:* Trento (Italy), Via Trener 8
>>>> *Registered office:* Trento (Italy), via Segantini 23
>>>>
>>>> Confidentially notice. This e-mail transmission may contain legally
>>>> privileged and/or confidential information. Please do not read it if you
>>>> are not the intended recipient(S). Any use, distribution, reproduction or
>>>> disclosure by any other person is strictly prohibited. If you have received
>>>> this e-mail in error, please notify the sender and destroy the original
>>>> transmission and its attachments without reading or saving it in any manner.
>>>>
>>>>
>>>
>>
>

Mime
View raw message