beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mahesh Vangala <vangalamahe...@gmail.com>
Subject Re: Launching a subprocess in DoFn
Date Mon, 20 Aug 2018 14:32:55 GMT
Hi Romain -

I don't see any errors when I used waitFor().
However, I don't see those processes being executed either since "docker ps
-a" doesn't list any processes.
This is quite unrelated to beam itself normally. If your engine (spark,
dataflow etc) doesn't have a security manager active
I am using DirectRunner though.
Let me know.
Thank you!

*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*


On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau <rmannibucau@gmail.com>
wrote:

> Hi Mahesh,
>
> Did you get the same error? This is quite unrelated to beam itself
> normally. If your engine (spark, dataflow etc) doesn't have a security
> manager active it should be enough, if it has you can be forbidden to use
> that.
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
>
> Le lun. 20 août 2018 à 16:08, Mahesh Vangala <vangalamaheshh@gmail.com> a
> écrit :
>
>> Hello Romain -
>>
>> I did try that, still no luck.
>> Also, when I put the process start logic into separate Test script, I do
>> notice successful docker container when I do "docker ps".
>> However, no such luck implementing that logic with in DoFn.
>> Any thoughts?
>> Thank you.
>>
>> Regards,
>> Mahesh
>>
>> *--*
>> *Mahesh Vangala*
>> *(Ph) 443-326-1957*
>> *(web) mvangala.com <http://mvangala.com>*
>>
>>
>> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <rmannibucau@gmail.com>
>> wrote:
>>
>>> waitFor and not java wait primitive?
>>>
>>> Le dim. 19 août 2018 04:35, Mahesh Vangala <vangalamaheshh@gmail.com>
a
>>> écrit :
>>>
>>>> Hello Beamers -
>>>>
>>>> I am trying to pull a POC - launch docker image per element in Input
>>>> PCollection and then return some data to Output Pcollection.
>>>>
>>>> Here is my code:
>>>>
>>>> public class VariantCaller
>>>>
>>>> {
>>>>
>>>>     public static void main( String[] args )
>>>>
>>>>     {
>>>>
>>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>> ).create();
>>>>
>>>>         Pipeline p = Pipeline.create(opts);
>>>>
>>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>>> "test_in.csv"));
>>>>
>>>>         PCollection<String> outLines = lines.apply(ParDo.of(new
>>>> LaunchDocker.LaunchJobs()));
>>>>
>>>>         PCollection<String> mergedLines = outLines
>>>> .apply(Combine.globally(new AddLines()));
>>>>
>>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>
>>>>         p.run();
>>>>
>>>>     }
>>>>
>>>> }
>>>>
>>>>
>>>> My LaunchDocker Code:
>>>>
>>>>
>>>> public class LaunchDocker {
>>>>
>>>>   public static class LaunchJobs extends DoFn<String, String> {
>>>>
>>>>     private static final long serialVersionUID = 1L;
>>>>
>>>>     private static final Logger LOG = LoggerFactory.getLogger(AddLines.
>>>> class);
>>>>
>>>>     @ProcessElement
>>>>
>>>>     public void processElement(ProcessContext c) throws Exception {
>>>>
>>>>       // Get the input element from ProcessContext.
>>>>
>>>>       String word = c.element().split(",")[0];
>>>>
>>>>       LOG.info(word);
>>>>
>>>>       ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
>>>>
>>>>           "docker run --rm ubuntu:16.04 sleep 20");
>>>>
>>>>        pb.start().wait();
>>>>
>>>>       // Use ProcessContext.output to emit the output element.
>>>>
>>>>       if (!word.isEmpty())
>>>>
>>>>         c.output(word + "\n");
>>>>
>>>>     }
>>>>
>>>>   }
>>>>
>>>> }
>>>>
>>>>
>>>> However, this fails with error:
>>>>
>>>>
>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
>>>> getEstimatedSizeBytes
>>>>
>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>>
>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split
>>>>
>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1
>>>> ms and produced 1 files and 9 bundles
>>>>
>>>> Aug 18, 2018 10:30:23 PM
>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>
>>>> INFO: sample1
>>>>
>>>> Aug 18, 2018 10:30:23 PM
>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>
>>>> INFO: 4
>>>>
>>>> Aug 18, 2018 10:30:23 PM
>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>
>>>> INFO: 1
>>>>
>>>> Exception in thread "main"
>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>> java.lang.IllegalMonitorStateException
>>>>
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>> DirectRunner.java:332)
>>>>
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>> DirectRunner.java:302)
>>>>
>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>> DirectRunner.java:197)
>>>>
>>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64
>>>> )
>>>>
>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>>
>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>>
>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>>>
>>>> Caused by: java.lang.IllegalMonitorStateException
>>>>
>>>> at java.lang.Object.wait(Native Method)
>>>>
>>>> at java.lang.Object.wait(Object.java:502)
>>>>
>>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
>>>> LaunchDocker.java:19)
>>>>
>>>>
>>>> Can you share your ideas what's the best way of achieving this?
>>>>
>>>> Thank you for your help!
>>>>
>>>>
>>>> Sincerely,
>>>>
>>>> Mahesh
>>>>
>>>>
>>>>
>>>> *--*
>>>> *Mahesh Vangala*
>>>> *(Ph) 443-326-1957*
>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>
>>>

Mime
View raw message