beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mahesh Vangala <vangalamahe...@gmail.com>
Subject Re: Launching a subprocess in DoFn
Date Mon, 20 Aug 2018 15:05:31 GMT
Hello Romain -

So, I have added pb.inheritIO().start().waitFor(); and now I have an
error /bin/bash:
docker: command not found.
But, I have docker installed on the system. /usr/local/bin/docker
Any ideas why I'm seeing this error when launched from within DoFn?
Thank you so much for your help.

*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*


On Mon, Aug 20, 2018 at 10:58 AM Romain Manni-Bucau <rmannibucau@gmail.com>
wrote:

> Weird, this code works:
>
> https://gist.github.com/rmannibucau/4703f321bb1962d1303f8eccbd05df0e
>
> Are you sure your test_in.csv has some data (otherwise no DoFn processing
> will be triggered)?
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
>
> Le lun. 20 août 2018 à 16:33, Mahesh Vangala <vangalamaheshh@gmail.com> a
> écrit :
>
>> Hi Romain -
>>
>> I don't see any errors when I used waitFor().
>> However, I don't see those processes being executed either since "docker
>> ps -a" doesn't list any processes.
>> This is quite unrelated to beam itself normally. If your engine (spark,
>> dataflow etc) doesn't have a security manager active
>> I am using DirectRunner though.
>> Let me know.
>> Thank you!
>>
>> *--*
>> *Mahesh Vangala*
>> *(Ph) 443-326-1957*
>> *(web) mvangala.com <http://mvangala.com>*
>>
>>
>> On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau <
>> rmannibucau@gmail.com> wrote:
>>
>>> Hi Mahesh,
>>>
>>> Did you get the same error? This is quite unrelated to beam itself
>>> normally. If your engine (spark, dataflow etc) doesn't have a security
>>> manager active it should be enough, if it has you can be forbidden to use
>>> that.
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>> <http://rmannibucau.wordpress.com> | Github
>>> <https://github.com/rmannibucau> | LinkedIn
>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>
>>>
>>> Le lun. 20 août 2018 à 16:08, Mahesh Vangala <vangalamaheshh@gmail.com>
>>> a écrit :
>>>
>>>> Hello Romain -
>>>>
>>>> I did try that, still no luck.
>>>> Also, when I put the process start logic into separate Test script, I
>>>> do notice successful docker container when I do "docker ps".
>>>> However, no such luck implementing that logic with in DoFn.
>>>> Any thoughts?
>>>> Thank you.
>>>>
>>>> Regards,
>>>> Mahesh
>>>>
>>>> *--*
>>>> *Mahesh Vangala*
>>>> *(Ph) 443-326-1957*
>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>
>>>>
>>>> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <
>>>> rmannibucau@gmail.com> wrote:
>>>>
>>>>> waitFor and not java wait primitive?
>>>>>
>>>>> Le dim. 19 août 2018 04:35, Mahesh Vangala <vangalamaheshh@gmail.com>
>>>>> a écrit :
>>>>>
>>>>>> Hello Beamers -
>>>>>>
>>>>>> I am trying to pull a POC - launch docker image per element in Input
>>>>>> PCollection and then return some data to Output Pcollection.
>>>>>>
>>>>>> Here is my code:
>>>>>>
>>>>>> public class VariantCaller
>>>>>>
>>>>>> {
>>>>>>
>>>>>>     public static void main( String[] args )
>>>>>>
>>>>>>     {
>>>>>>
>>>>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>>>> ).create();
>>>>>>
>>>>>>         Pipeline p = Pipeline.create(opts);
>>>>>>
>>>>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>>>>> "test_in.csv"));
>>>>>>
>>>>>>         PCollection<String> outLines = lines.apply(ParDo.of(new
>>>>>> LaunchDocker.LaunchJobs()));
>>>>>>
>>>>>>         PCollection<String> mergedLines = outLines
>>>>>> .apply(Combine.globally(new AddLines()));
>>>>>>
>>>>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>>>
>>>>>>         p.run();
>>>>>>
>>>>>>     }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> My LaunchDocker Code:
>>>>>>
>>>>>>
>>>>>> public class LaunchDocker {
>>>>>>
>>>>>>   public static class LaunchJobs extends DoFn<String, String>
{
>>>>>>
>>>>>>     private static final long serialVersionUID = 1L;
>>>>>>
>>>>>>     private static final Logger LOG =
>>>>>> LoggerFactory.getLogger(AddLines.class);
>>>>>>
>>>>>>     @ProcessElement
>>>>>>
>>>>>>     public void processElement(ProcessContext c) throws Exception
{
>>>>>>
>>>>>>       // Get the input element from ProcessContext.
>>>>>>
>>>>>>       String word = c.element().split(",")[0];
>>>>>>
>>>>>>       LOG.info(word);
>>>>>>
>>>>>>       ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
>>>>>>
>>>>>>           "docker run --rm ubuntu:16.04 sleep 20");
>>>>>>
>>>>>>        pb.start().wait();
>>>>>>
>>>>>>       // Use ProcessContext.output to emit the output element.
>>>>>>
>>>>>>       if (!word.isEmpty())
>>>>>>
>>>>>>         c.output(word + "\n");
>>>>>>
>>>>>>     }
>>>>>>
>>>>>>   }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> However, this fails with error:
>>>>>>
>>>>>>
>>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
>>>>>> getEstimatedSizeBytes
>>>>>>
>>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>>>>
>>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split
>>>>>>
>>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took
1
>>>>>> ms and produced 1 files and 9 bundles
>>>>>>
>>>>>> Aug 18, 2018 10:30:23 PM
>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>>
>>>>>> INFO: sample1
>>>>>>
>>>>>> Aug 18, 2018 10:30:23 PM
>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>>
>>>>>> INFO: 4
>>>>>>
>>>>>> Aug 18, 2018 10:30:23 PM
>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>>
>>>>>> INFO: 1
>>>>>>
>>>>>> Exception in thread "main"
>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>> java.lang.IllegalMonitorStateException
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>>> DirectRunner.java:332)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>>> DirectRunner.java:302)
>>>>>>
>>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>>> DirectRunner.java:197)
>>>>>>
>>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>>> DirectRunner.java:64)
>>>>>>
>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>>>>
>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>>>>
>>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)
>>>>>>
>>>>>> Caused by: java.lang.IllegalMonitorStateException
>>>>>>
>>>>>> at java.lang.Object.wait(Native Method)
>>>>>>
>>>>>> at java.lang.Object.wait(Object.java:502)
>>>>>>
>>>>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
>>>>>> LaunchDocker.java:19)
>>>>>>
>>>>>>
>>>>>> Can you share your ideas what's the best way of achieving this?
>>>>>>
>>>>>> Thank you for your help!
>>>>>>
>>>>>>
>>>>>> Sincerely,
>>>>>>
>>>>>> Mahesh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *--*
>>>>>> *Mahesh Vangala*
>>>>>> *(Ph) 443-326-1957*
>>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>>
>>>>>

Mime
View raw message