From user-return-3785-archive-asf-public=cust-asf.ponee.io@beam.apache.org Mon Aug 20 17:05:51 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8E855180663 for ; Mon, 20 Aug 2018 17:05:49 +0200 (CEST) Received: (qmail 38714 invoked by uid 500); 20 Aug 2018 15:05:48 -0000 Mailing-List: contact user-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@beam.apache.org Delivered-To: mailing list user@beam.apache.org Received: (qmail 38704 invoked by uid 99); 20 Aug 2018 15:05:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Aug 2018 15:05:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 37B1618069C for ; Mon, 20 Aug 2018 15:05:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.869 X-Spam-Level: * X-Spam-Status: No, score=1.869 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_DKIMWL_WL_MED=-0.01] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id My-ggCpBOHg2 for ; Mon, 20 Aug 2018 15:05:44 +0000 (UTC) Received: from mail-oi0-f50.google.com (mail-oi0-f50.google.com [209.85.218.50]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id ADD1B5F230 for ; Mon, 20 Aug 2018 15:05:44 +0000 (UTC) Received: by mail-oi0-f50.google.com with SMTP id n21-v6so26368559oig.3 for ; Mon, 20 Aug 2018 08:05:44 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=PF3jSohn/cEVWUU7k4AZYUl3gZXKEM+wMPAFx7abNtY=; b=NyQ4Ut02DWjbcWExfcQu9wNPYGYfpI2LBq+tvBXJCoOBoFiV1pQmaqSmYux79oV5jn TzOdXXCabbUNiIQwLrCgg0vqBybOpCfc+/hKVBTkZmje4soQPQBZPlNOir7Xm0JMmtUL OW+yXL6XtZzWIKJUcU1d7w9ldlpA1k9GZtia6QWZ+4BPNTaV+BdI/pLWcpGqA3/3Td5p AKIN6+ToNOSEEpBOPlsYZBWYEuYiUNux/KqRe98G7G/GPuiMpthmzIKsSTvrO8MB6ugi 4epsU7zmgy5hlpe2tfIci8V+AdEyvIuHgthGeSs5a03vWIoFgmxaMqDKWA2Mv9j4tdIZ ZIgA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=PF3jSohn/cEVWUU7k4AZYUl3gZXKEM+wMPAFx7abNtY=; b=OykAI5WCSvcw8QySuhm4z6ukAIjgadWmjOLuueGuMt8CVcYY/jlSO/tJL0FbbZgTd3 f6kMlXkU6OBGM9PDeS8V0zTXQFeIVcvmY8wjJCKQOTAFbDdxCt7YM+N7rYYNNZPiMUeV fhTiNxBm9lxe6BdOUWXzNSUPLTCFdQh2LfMZyzUr5zk7MFXaWMMzmRuGqNzXy/Qeu67F /mUGpkEto6PROWWcf4L9TIVd7M0GOM0Ln1FYM5jXdnWGs3o8rjXxkg885hmSRZMnVbFn 6vSigjHBVJoYjIjdNaY3hpiocSzIpzxy1/0+zVCnmv2SnD+POppZpMHH7OQEjElxLz7x WJ5A== X-Gm-Message-State: AOUpUlHINJmuFPcEjIXU1EMFINI9CDnEHOtebw543KBsauQ5R+EndzWn MkCnhT1Weo1dAlt9+wN+zrsUhGshFzTRvsWlopkMXDSa X-Google-Smtp-Source: AA+uWPyhJW3Th00JWmQ1x2pufYmrVB4JD4VEbOR+tMztHXMQoM6iGlZBzACQkFBWJ8g92fcu2f1WAl6ABB0tOmSLmSc= X-Received: by 2002:aca:390a:: with SMTP id g10-v6mr13091032oia.145.1534777543712; Mon, 20 Aug 2018 08:05:43 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Mahesh Vangala Date: Mon, 20 Aug 2018 11:05:31 -0400 Message-ID: Subject: Re: Launching a subprocess in DoFn To: user@beam.apache.org Content-Type: multipart/alternative; boundary="0000000000004d65d90573df3e81" --0000000000004d65d90573df3e81 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hello Romain - So, I have added pb.inheritIO().start().waitFor(); and now I have an error /bin/bash: docker: command not found. But, I have docker installed on the system. /usr/local/bin/docker Any ideas why I'm seeing this error when launched from within DoFn? Thank you so much for your help. *--* *Mahesh Vangala* *(Ph) 443-326-1957* *(web) mvangala.com * On Mon, Aug 20, 2018 at 10:58 AM Romain Manni-Bucau wrote: > Weird, this code works: > > https://gist.github.com/rmannibucau/4703f321bb1962d1303f8eccbd05df0e > > Are you sure your test_in.csv has some data (otherwise no DoFn processing > will be triggered)? > > Romain Manni-Bucau > @rmannibucau | Blog > | Old Blog > | Github > | LinkedIn > | Book > > > > Le lun. 20 ao=C3=BBt 2018 =C3=A0 16:33, Mahesh Vangala a > =C3=A9crit : > >> Hi Romain - >> >> I don't see any errors when I used waitFor(). >> However, I don't see those processes being executed either since "docker >> ps -a" doesn't list any processes. >> This is quite unrelated to beam itself normally. If your engine (spark, >> dataflow etc) doesn't have a security manager active >> I am using DirectRunner though. >> Let me know. >> Thank you! >> >> *--* >> *Mahesh Vangala* >> *(Ph) 443-326-1957* >> *(web) mvangala.com * >> >> >> On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau < >> rmannibucau@gmail.com> wrote: >> >>> Hi Mahesh, >>> >>> Did you get the same error? This is quite unrelated to beam itself >>> normally. If your engine (spark, dataflow etc) doesn't have a security >>> manager active it should be enough, if it has you can be forbidden to u= se >>> that. >>> >>> Romain Manni-Bucau >>> @rmannibucau | Blog >>> | Old Blog >>> | Github >>> | LinkedIn >>> | Book >>> >>> >>> >>> Le lun. 20 ao=C3=BBt 2018 =C3=A0 16:08, Mahesh Vangala >>> a =C3=A9crit : >>> >>>> Hello Romain - >>>> >>>> I did try that, still no luck. >>>> Also, when I put the process start logic into separate Test script, I >>>> do notice successful docker container when I do "docker ps". >>>> However, no such luck implementing that logic with in DoFn. >>>> Any thoughts? >>>> Thank you. >>>> >>>> Regards, >>>> Mahesh >>>> >>>> *--* >>>> *Mahesh Vangala* >>>> *(Ph) 443-326-1957* >>>> *(web) mvangala.com * >>>> >>>> >>>> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau < >>>> rmannibucau@gmail.com> wrote: >>>> >>>>> waitFor and not java wait primitive? >>>>> >>>>> Le dim. 19 ao=C3=BBt 2018 04:35, Mahesh Vangala >>>>> a =C3=A9crit : >>>>> >>>>>> Hello Beamers - >>>>>> >>>>>> I am trying to pull a POC - launch docker image per element in Input >>>>>> PCollection and then return some data to Output Pcollection. >>>>>> >>>>>> Here is my code: >>>>>> >>>>>> public class VariantCaller >>>>>> >>>>>> { >>>>>> >>>>>> public static void main( String[] args ) >>>>>> >>>>>> { >>>>>> >>>>>> PipelineOptions opts =3D PipelineOptionsFactory.fromArgs(arg= s >>>>>> ).create(); >>>>>> >>>>>> Pipeline p =3D Pipeline.create(opts); >>>>>> >>>>>> PCollection lines =3D p.apply(TextIO.read().from( >>>>>> "test_in.csv")); >>>>>> >>>>>> PCollection outLines =3D lines.apply(ParDo.of(new >>>>>> LaunchDocker.LaunchJobs())); >>>>>> >>>>>> PCollection mergedLines =3D outLines >>>>>> .apply(Combine.globally(new AddLines())); >>>>>> >>>>>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>>>>> >>>>>> p.run(); >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> My LaunchDocker Code: >>>>>> >>>>>> >>>>>> public class LaunchDocker { >>>>>> >>>>>> public static class LaunchJobs extends DoFn { >>>>>> >>>>>> private static final long serialVersionUID =3D 1L; >>>>>> >>>>>> private static final Logger LOG =3D >>>>>> LoggerFactory.getLogger(AddLines.class); >>>>>> >>>>>> @ProcessElement >>>>>> >>>>>> public void processElement(ProcessContext c) throws Exception { >>>>>> >>>>>> // Get the input element from ProcessContext. >>>>>> >>>>>> String word =3D c.element().split(",")[0]; >>>>>> >>>>>> LOG.info(word); >>>>>> >>>>>> ProcessBuilder pb =3D new ProcessBuilder("/bin/bash", "-c", >>>>>> >>>>>> "docker run --rm ubuntu:16.04 sleep 20"); >>>>>> >>>>>> pb.start().wait(); >>>>>> >>>>>> // Use ProcessContext.output to emit the output element. >>>>>> >>>>>> if (!word.isEmpty()) >>>>>> >>>>>> c.output(word + "\n"); >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> However, this fails with error: >>>>>> >>>>>> >>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource >>>>>> getEstimatedSizeBytes >>>>>> >>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36 >>>>>> >>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource spli= t >>>>>> >>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took = 1 >>>>>> ms and produced 1 files and 9 bundles >>>>>> >>>>>> Aug 18, 2018 10:30:23 PM >>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>>> >>>>>> INFO: sample1 >>>>>> >>>>>> Aug 18, 2018 10:30:23 PM >>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>>> >>>>>> INFO: 4 >>>>>> >>>>>> Aug 18, 2018 10:30:23 PM >>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>>> >>>>>> INFO: 1 >>>>>> >>>>>> Exception in thread "main" >>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>>>> java.lang.IllegalMonitorStateException >>>>>> >>>>>> at >>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.wai= tUntilFinish( >>>>>> DirectRunner.java:332) >>>>>> >>>>>> at >>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.wai= tUntilFinish( >>>>>> DirectRunner.java:302) >>>>>> >>>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>>> DirectRunner.java:197) >>>>>> >>>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>>> DirectRunner.java:64) >>>>>> >>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >>>>>> >>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >>>>>> >>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29= ) >>>>>> >>>>>> Caused by: java.lang.IllegalMonitorStateException >>>>>> >>>>>> at java.lang.Object.wait(Native Method) >>>>>> >>>>>> at java.lang.Object.wait(Object.java:502) >>>>>> >>>>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement( >>>>>> LaunchDocker.java:19) >>>>>> >>>>>> >>>>>> Can you share your ideas what's the best way of achieving this? >>>>>> >>>>>> Thank you for your help! >>>>>> >>>>>> >>>>>> Sincerely, >>>>>> >>>>>> Mahesh >>>>>> >>>>>> >>>>>> >>>>>> *--* >>>>>> *Mahesh Vangala* >>>>>> *(Ph) 443-326-1957* >>>>>> *(web) mvangala.com * >>>>>> >>>>> --0000000000004d65d90573df3e81 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hello Romain -

So, I have added=C2=A0pb.inheritIO().start().waitFor();=C2=A0and now I have an = error=C2=A0/bin/bash: docker: command not found.
But, I have docker installed on the system.=C2=A0/usr/l= ocal/bin/docker
Any ideas why I&= #39;m seeing this error when launched from within DoFn?
Thank you so much for your help.
<= div class=3D"gmail_default" style=3D"font-family:verdana,sans-serif;font-si= ze:large;color:#3d85c6">
--
Mahesh Vangala=
(Ph) 44= 3-326-1957


= Weird, this code works:

=
Are you sure your test_in.csv has some data (otherwise no Do= Fn processing will be triggered)?
=

Romain Manni-Bucau
@rman= nibucau | =C2=A0Blog=C2=A0| Old Blog |=C2=A0Github=C2=A0| LinkedIn=C2=A0| Book


Le=C2=A0lun. 20 ao=C3=BBt= 2018 =C3=A0=C2=A016:33, Mahesh Vangala <vangalamaheshh@gmail.com> a =C3=A9cri= t=C2=A0:
Hi Romain -

I don't see any errors when I used waitFor().
However, I don't see those processes being execu= ted either since "docker ps -a" doesn't list any processes.
This is quite unrelated to bea= m itself normally. If your engine (spark, dataflow etc) doesn't have a = security manager active
I am usin= g DirectRunner though.=C2=A0
Let me know.=
Thank you!
=
--
Mahesh Vangala
(Ph) 443-326-1957
=


On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau <rmannibucau@gmail.com&= gt; wrote:
Hi Mahe= sh,

Did you get the same error? This is quite unrelated = to beam itself normally. If your engine (spark, dataflow etc) doesn't h= ave a security manager active it should be enough, if it has you can be for= bidden to use that.
=
Romain Manni-Bucau
@rmannibucau | =C2= =A0Blog= =C2=A0| Old = Blog |=C2=A0Github=C2=A0| LinkedIn=C2=A0| Book<= /div>


Le=C2=A0lun. 20 ao=C3=BBt 2018 =C3=A0=C2= =A016:08, Mahesh Vangala <vangalamaheshh@gmail.com> a =C3=A9crit=C2=A0:
Hello Romain -

I did try that, still no luck.
Also, wh= en I put the process start logic into separate Test script, I do notice suc= cessful docker container when I do "docker ps".
However, no such luck implementing that logic with in DoFn.<= /div>
Any thoughts?
T= hank you.

= Regards,
Mahesh

--
Mahesh Vangala
(Ph) 443-326-1957

<= /div>
On Sun, Aug 19, 2018 a= t 3:53 AM Romain Manni-Bucau <rmannibucau@gmail.com> wrote:
waitFor and not java wait primitive?
Le dim. 19 ao=C3=BBt 20= 18 04:35, Mahesh Vangala <vangalamaheshh@gmail.com> a =C3=A9crit=C2=A0:
Hello Beamers -

I am trying to pull a POC - launch docker image per element in Input= PCollection and then return some data to Output Pcollection.

Here is my code:
<= div class=3D"gmail_default" style=3D"font-family:verdana,sans-serif;font-si= ze:large;color:#3d85c6">

public class VariantCaller

{

=C2=A0 =C2=A0 public static void main( String[] args )

=C2=A0 =C2=A0 {

=C2=A0 =C2=A0 =C2=A0 =C2=A0 PipelineOptio= ns opts =3D PipelineOptionsFactor= y.fromArgs(args).create();

=C2=A0 =C2=A0 =C2=A0 =C2=A0 Pipeline p =3D Pipeline.create(opts);

=C2=A0 =C2=A0 =C2=A0 =C2=A0 PCollection&l= t;String> lines =3D p.apply(TextIO.read().from("test_in.csv"));

=C2=A0 =C2=A0 =C2=A0 =C2=A0 PCollection&l= t;String> outLines =3D lines.apply(ParDo.of(new LaunchDocker.LaunchJobs()));

=C2=A0 =C2=A0 =C2=A0 =C2=A0 PCollection&l= t;String> mergedLines =3D outLines.apply(Combine.globally(new AddLines()));

=C2=A0 =C2=A0 =C2=A0 =C2=A0 mergedLines.apply(TextIO.write().to("test_out.csv"));

=C2=A0 =C2=A0 =C2=A0 =C2=A0 p.run();

=C2=A0 =C2=A0 }

}


My LaunchDocker Code:


public class LaunchDocker {

=C2=A0 public static class LaunchJobs= extends DoFn<String, String&= gt; {

=C2=A0 =C2=A0 private static final long serialVersi= onUID =3D 1L;

=C2=A0 =C2=A0 private static final Logge= r LOG =3D LoggerFactory.getLogger(= AddLines.class);

=C2=A0 =C2=A0 @ProcessElement

=C2=A0 =C2= =A0 public void processElement(ProcessContext c) thr= ows Exception {

=C2=A0 = =C2=A0 =C2=A0 // Get the input element from ProcessContext.

=C2=A0 =C2= =A0 =C2=A0 String word =3D= c.element().split(",")[0];

=C2=A0 =C2=A0 =C2=A0 LOG.info(word);

=C2=A0 =C2=A0 =C2=A0 ProcessBuilder pb =3D new Proces= sBuilder("/bin/bash", <= span class=3D"m_1895877055788154300m_-3529293799802077112m_4089062068192815= 936m_7450536437490636123m_-7918990177204677667m_-1784712037981050340gmail-s= 5" style=3D"color:rgb(57,51,255)">"-c",

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 &q= uot;docker run --rm ubuntu:16.04 sleep 20");

=C2=A0=C2=A0 =C2=A0 =C2=A0 pb<= /span>.start().wait();

=C2=A0 = =C2=A0 =C2=A0 // Use ProcessContext.output to emit the output= element.

= =C2=A0 =C2=A0 =C2=A0 if (= !word.isEmpty())

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 c.output(word + = "\n");

=C2=A0 =C2=A0 }

=C2=A0 }

}


However, this fails with error:


Aug 18, 2018 10:30:23= PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes

INFO: Filepattern test= _in.csv matched 1 files with total size 36

Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.= io.FileBasedSource split

INFO: Splitting filepattern test_in.csv into bundles of size 4 = took 1 ms and produced 1 files and 9 bundles

Aug 18, 2018 10:30:23 PM pipelines.variant_= caller.LaunchDocker$LaunchJobs processElement

INFO: sample1

Aug 18, 2018 10:30:23 PM pipelines.varian= t_caller.LaunchDocker$LaunchJobs processElement

INFO: 4

Aug 18, 2018 10:30:23 PM pipelines.variant_ca= ller.LaunchDocker$LaunchJobs processElement

INFO: 1

Exception in t= hread "main" org.a= pache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.= IllegalMonitorStateException

at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.wait= UntilFinish(Di= rectRunner.java:332)

at or= g.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFin= ish(DirectRunn= er.java:302)

at org.apache= .beam.runners.direct.DirectRunner.run(DirectRunner.java:197)

at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java= :64)

<= span class=3D"m_1895877055788154300m_-3529293799802077112m_4089062068192815= 936m_7450536437490636123m_-7918990177204677667m_-1784712037981050340gmail-A= pple-tab-span" style=3D"white-space:pre-wrap"> at org.apache.beam.sd= k.Pipeline.run(Pipeline.java:313)

at org= .apache.beam.sdk.Pipeline.run(Pipeline.java:299)

at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29)

=

Caused by: java.lang.IllegalMonitorStateException

at java.lang.Object.wait(Native Method)

at java.lang.Object.wait(Object.java:502)

= at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(= LaunchDoc= ker.java:19)


Can you share your ideas what's the best way of achieving= this?

Thank you for your help!=


Sincerely,

M= ahesh=C2=A0=C2=A0



<= /div>
--<= /div>Mahesh Vangala
<= span style=3D"background-color:rgb(207,226,243)">(Ph) 443-326-1957
--0000000000004d65d90573df3e81--