From dev-return-8856-archive-asf-public=cust-asf.ponee.io@samza.apache.org Sat Mar 17 01:02:38 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D0A4D180608 for ; Sat, 17 Mar 2018 01:02:37 +0100 (CET) Received: (qmail 40370 invoked by uid 500); 17 Mar 2018 00:02:36 -0000 Mailing-List: contact dev-help@samza.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@samza.apache.org Delivered-To: mailing list dev@samza.apache.org Received: (qmail 40353 invoked by uid 99); 17 Mar 2018 00:02:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 17 Mar 2018 00:02:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 856F5C012B for ; Sat, 17 Mar 2018 00:02:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.131 X-Spam-Level: ** X-Spam-Status: No, score=2.131 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, WEIRD_PORT=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 2ykvDv5vnskx for ; Sat, 17 Mar 2018 00:02:24 +0000 (UTC) Received: from mail-vk0-f42.google.com (mail-vk0-f42.google.com [209.85.213.42]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 9F10C5F202 for ; Sat, 17 Mar 2018 00:02:23 +0000 (UTC) Received: by mail-vk0-f42.google.com with SMTP id x125so7330057vkc.13 for ; Fri, 16 Mar 2018 17:02:23 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=XmQBh0/Gcn0cglaj9EqcUgRKYVvFSHmaUWn9Pq2JD2E=; b=jUf5xoqgk0SEXrX1r99C7VbVQGvhhzl7rU5i8Ko2UKMW79LL4XhRovzOluLoMnUy2r KNoKQxp8wMBA2zxOZNBHx8CvM35bqETGB2LGxDBtNzcYC+SAHZR5WTA6mDfk1P4LQkfU oRQNG0ctlZFaRaq3w8cxlHiS+nxK5S3BJWkEa1ewF6ubkuA1766/fs6KOvECGpoqqu/n 3ovjK0kbMdTl63JT6lxqGuNnjjS0MdKuNqv0oN+Uz9V65D3W5IAG3Sy7xZGA+bmfqleK lgcD8M7J1sOutCp0wAPDDEgZyt+ojZOBIs8iI5JpcXV+AiwSilG1S5ewvLY+b4r5JV86 KB2g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=XmQBh0/Gcn0cglaj9EqcUgRKYVvFSHmaUWn9Pq2JD2E=; b=I1lbEREkyCEKGDpoDF0c5njH2rU71KipyCG9Tf9AJCKkMXRwndH5Ctesjo3FraFrXm NN5BiqL1C/QDmcnviRjMv4bXmTKkUc9cO0yasZHTm26ypt81RG40PyrCO0UkFNy0zIYe 6we25lmC7jh4/FP0gByF3o/VSZSlYoTXWZoXnZohIOT4OTPpiVX65iBfcVAqmKTW/rmk pjEKik46hxhf0PZhua2kB6VMTL9DvVMTHRDRgyjBo6GS6ICFSFauu8yZLGgCMNHtmwUw BbwdgV2dEGPcQurcocUW138gIn63WxBNfb+ATqsEGWm+esJSiZxra/Po/0UNq3ipfCmp NycA== X-Gm-Message-State: AElRT7HHF9ydqUwHjxgHCuJWG19+wyyJXbYPY0zdhchTRRrcZCfZJ5Zz 2HvYCzuEZ0LfQPvNSFh0JzUWOMpYF7EQfT13JXlYrA== X-Google-Smtp-Source: AG47ELsGVXfx/YmLv0UuHi58sqDkaREtX5x6nHyXXGZaw4/UEjhn9zd7Z4gyYjceN1zQsUGdkstW0VfeELREPlYOsbc= X-Received: by 10.31.59.139 with SMTP id i133mr2594950vka.4.1521244942582; Fri, 16 Mar 2018 17:02:22 -0700 (PDT) MIME-Version: 1.0 Received: by 10.176.80.249 with HTTP; Fri, 16 Mar 2018 17:02:22 -0700 (PDT) In-Reply-To: References: <87d106qi1n.fsf@recursivedream.com> From: Prateek Maheshwari Date: Fri, 16 Mar 2018 17:02:22 -0700 Message-ID: Subject: Re: Old style "low level" Tasks with alternative deployment model(s) To: dev@samza.apache.org Cc: Jagadish Venkatraman , "tom@recursivedream.com" , "Yipan@linkedin.com" , Yi Pan Content-Type: multipart/alternative; boundary="001a1142f9946b4abb05679070c4" --001a1142f9946b4abb05679070c4 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Thunder, Can you please take and attach a thread dump with this? Thanks, Prateek On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges wrote: > It appears it IS hung while serializing the JobModel... very strange! I > added some debug statements around the calls: > > LOG.debug("Getting object mapper to serialize job model"); // this > IS printed > ObjectMapper mmapper =3D SamzaObjectMapper.getObjectMapper(); > LOG.debug("Serializing job model"); // this IS printed > String jobModelStr =3D mmapper.writerWithDefaultPrettyPrinter > ().writeValueAsString(jobModel); > LOG.info("jobModelAsString=3D" + jobModelStr); // this is NOT print= ed! > > Another thing I noticed is that "getObjectMapper" actually creates the > object mapper twice! > > 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG > org.apache.samza.zk.ZkUtils - Getting object mapper to serialize job mode= l > 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG o.a.s.s.model= .SamzaObjectMapper > - Creating new object mapper and simple module > 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG o.a.s.s.model= .SamzaObjectMapper > - Adding SerDes and mixins > 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG o.a.s.s.model= .SamzaObjectMapper > - Adding custom ContainerModel deserializer > 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG o.a.s.s.model= .SamzaObjectMapper > - Setting up naming strategy and registering module > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG o.a.s.s.model= .SamzaObjectMapper > - Done! > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG o.a.s.s.model= .SamzaObjectMapper > - Creating new object mapper and simple module > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG o.a.s.s.model= .SamzaObjectMapper > - Adding SerDes and mixins > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG o.a.s.s.model= .SamzaObjectMapper > - Adding custom ContainerModel deserializer > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG o.a.s.s.model= .SamzaObjectMapper > - Setting up naming strategy and registering module > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG o.a.s.s.model= .SamzaObjectMapper > - Done! > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG > org.apache.samza.zk.ZkUtils - Serializing job model > > Could this ObjectMapper be a singleton? I see there is a private static > instance, but getObjectMapper creates a new one every time... > > Anyway, then it takes off to serialize the job model and never comes > back... > > Hoping someone has some idea here... the implementation for this mostly > comes from Jackson-mapper-asl, and I have the version that is linked in t= he > 0.14.0 tag: > | | | +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13 > | | | | \--- org.codehaus.jackson:jackson-core-asl:1.9.13 > > Thanks! > Thunder > > -----Original Message----- > From: Thunder Stumpges [mailto:tstumpges@ntent.com] > Sent: Friday, March 16, 2018 15:29 > To: dev@samza.apache.org; Jagadish Venkatraman > Cc: tom@recursivedream.com; Yipan@linkedin.com; Yi Pan < > nickpan47@gmail.com> > Subject: RE: Old style "low level" Tasks with alternative deployment > model(s) > > So, my investigation starts at StreamProcessor.java. Line 294 in method > onNewJobModel() logs an INFO message that it's starting a container. This > message never appears. > > I see that ZkJobCoordinator calls onNewJobModel from its > onNewJobModelConfirmed method which also logs an info message stating > "version X of the job model got confirmed". I never see this message > either, so I go up the chain some more. > > I DO see: > > 2018-03-16 21:43:58 logback 20498 [ZkClient-EventThread-13-10.0.127.114:2= 181] > INFO o.apache.samza.zk.ZkJobCoordinator - ZkJobCoordinator::onBecomeLead= er > - I became the leader! > And > 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO > o.apache.samza.zk.ZkJobCoordinator - pid=3D91e07d20-ae33-4156-a5f3-534a95= 642133Generated > new Job Model. Version =3D 1 > > Which led me to method onDoProcessorChange line 210. I see that line, but > not the line below " Published new Job Model. Version =3D" so something i= n > here is not completing: > > LOG.info("pid=3D" + processorId + "Generated new Job Model. Version = =3D " > + nextJMVersion); > > // Publish the new job model > zkUtils.publishJobModel(nextJMVersion, jobModel); > > // Start the barrier for the job model update > barrier.create(nextJMVersion, currentProcessorIds); > > // Notify all processors about the new JobModel by updating JobModel > Version number > zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion); > > LOG.info("pid=3D" + processorId + "Published new Job Model. Version = =3D " > + nextJMVersion); > > As I mentioned, after the line "Generated new Job Model. Version =3D 1" I > just get repeated zk ping responses.. no more application logging. > > The very next thing that's run is zkUtils.publishJobModel() which only ha= s > two lines before another log statement (which I don't see): > > public void publishJobModel(String jobModelVersion, JobModel jobModel) = { > try { > ObjectMapper mmapper =3D SamzaObjectMapper.getObjectMapper(); > String jobModelStr =3D mmapper.writerWithDefaultPrettyPrinter > ().writeValueAsString(jobModel); > LOG.info("jobModelAsString=3D" + jobModelStr); > ... > > Could it really be getting hung up on one of these two lines? (seems like > it must be, but I don't see anything there that seems like it would just > hang). I'll keep troubleshooting, maybe add some more debug logging and t= ry > again. > > Thanks for any guidance you all might have. > -Thunder > > > -----Original Message----- > From: Thunder Stumpges [mailto:tstumpges@ntent.com] > Sent: Friday, March 16, 2018 14:43 > To: dev@samza.apache.org; Jagadish Venkatraman > Cc: tom@recursivedream.com; Yipan@linkedin.com; Yi Pan < > nickpan47@gmail.com> > Subject: RE: Old style "low level" Tasks with alternative deployment > model(s) > > Well I have my stand-alone application in docker and running in > kubernetes. I think something isn't wired up all the way though, because = my > task never actually gets invoked. I see no errors, however I'm not gettin= g > the usual startup logs (checking existing offsets, "entering run loop"...= ) > My logs look like this: > > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO kafka.utils.V= erifiableProperties > - Verifying properties > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO kafka.utils.V= erifiableProperties > - Property client.id is overridden to samza_admin-test_stream_task-1 > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO kafka.utils.V= erifiableProperties > - Property metadata.broker.list is overridden to > test-kafka-kafka.test-svc:9092 > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO kafka.utils.V= erifiableProperties > - Property request.timeout.ms is overridden to 30000 > 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO > kafka.client.ClientUtils$ - Fetching metadata from broker > BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation id 0 > for 1 topic(s) Set(dev_k8s.samza.test.topic) > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG > kafka.network.BlockingChannel - Created socket with SO_TIMEOUT =3D 30000 > (requested 30000), SO_RCVBUF =3D 179680 (requested -1), SO_SNDBUF =3D 102= 400 > (requested 102400), connectTimeoutMs =3D 30000. > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO > kafka.producer.SyncProducer - Connected to test-kafka-kafka.test-svc:9092 > for producing > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO > kafka.producer.SyncProducer - Disconnecting from > test-kafka-kafka.test-svc:9092 > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG > kafka.client.ClientUtils$ - Successfully fetched metadata for 1 topic(s) > Set(dev_k8s.samza.test.topic) > 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper > org.apache.samza.container.grouper.stream.GroupByPartition@1a7158cc has > grouped the SystemStreamPartitions into 10 tasks with the following > taskNames: [Partition 1, Partition 0, Partition 3, Partition 2, Partition > 5, Partition 4, Partition 7, Partition 6, Partition 9, Partition 8] > 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - New task Partition 0 is being > assigned changelog partition 0. > 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - New task Partition 1 is being > assigned changelog partition 1. > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - New task Partition 2 is being > assigned changelog partition 2. > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - New task Partition 3 is being > assigned changelog partition 3. > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - New task Partition 4 is being > assigned changelog partition 4. > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - New task Partition 5 is being > assigned changelog partition 5. > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - New task Partition 6 is being > assigned changelog partition 6. > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - New task Partition 7 is being > assigned changelog partition 7. > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - New task Partition 8 is being > assigned changelog partition 8. > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO > o.a.s.coordinator.JobModelManager$ - New task Partition 9 is being > assigned changelog partition 9. > 2018-03-16 21:05:55 logback 50838 [main-SendThread(10.0.127.114:2181)] > DEBUG org.apache.zookeeper.ClientCnxn - Reading reply > sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null serverPath:null > finished:false header:: 23,4 replyHeader:: 23,14024,0 request:: > '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/ > JobModelGeneration/jobModelVersion,T response:: > ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878} > 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO > o.apache.samza.zk.ZkJobCoordinator - pid=3Da14a0434-a238-4ff6-935b-c78d90= 6fe80dGenerated > new Job Model. Version =3D 1 > 2018-03-16 21:06:05 logback 60848 [main-SendThread(10.0.127.114:2181)] > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: > 0x1622c8b5fc01ac7 after 2ms > 2018-03-16 21:06:15 logback 70856 [main-SendThread(10.0.127.114:2181)] > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: > 0x1622c8b5fc01ac7 after 1ms > 2018-03-16 21:06:25 logback 80865 [main-SendThread(10.0.127.114:2181)] > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: > 0x1622c8b5fc01ac7 after 2ms ... > > The zk ping responses continue every 10 seconds, but no other activity or > messages occur. > It looks like it gets as far as confirming the JobModel and grouping the > partitions, but nothing actually starts up. > > Any ideas? > Thanks in advance! > Thunder > > > -----Original Message----- > From: Thunder Stumpges [mailto:tstumpges@ntent.com] > Sent: Thursday, March 15, 2018 16:35 > To: Jagadish Venkatraman > Cc: dev@samza.apache.org; tom@recursivedream.com; Yipan@linkedin.com; Yi > Pan > Subject: RE: Old style "low level" Tasks with alternative deployment > model(s) > > Thanks a lot for the info. I have something basically working at this > point! I have not integrated it with Docker nor Kubernetes yet, but it do= es > run from my local machine. > > I have determined that LocalApplicationRunner does NOT do config > rewriting. I had to write my own little =E2=80=9CStandAloneApplicationRun= ner=E2=80=9D that > handles the =E2=80=9Cmain=E2=80=9D entrypoint. It does command parsing us= ing CommandLine, > load config from ConfigFactory, and perform rewriting before creating the > new instance of LocalApplicationRunner. This is all my > StandAloneApplicationRunner contains: > > > object StandAloneSamzaRunner extends App with LazyLogging { > > // parse command line args just like JobRunner. > val cmdline =3D new ApplicationRunnerCommandLine > val options =3D cmdline.parser.parse(args: _*) > val config =3D cmdline.loadConfig(options) > > val runner =3D new LocalApplicationRunner(Util.rewriteConfig(config)) > runner.runTask() > runner.waitForFinish() > } > > The only config settings I needed to make to use this runner were (easily > configured due to our central Consul config system and our rewriter) : > > # use the ZK based job coordinator > job.coordinator.factory=3Dorg.apache.samza.zk.ZkJobCoordinatorFactory > # need to use GroupByContainerIds instead of GroupByContainerCount > task.name.grouper.factory=3Dorg.apache.samza.container.grouper.task. > GroupByContainerIdsFactory > # ZKJC config > job.coordinator.zk.connect=3D > > I did run into one potential problem; as you see above, I have started th= e > task using runTask() and then to prevent my main method from returning, I > have called waitForFinish(). The first time I ran it, the job itself fail= ed > because I had forgotten to override the task grouper, and container count > was pulled from our staging environment. There are some failures logged a= nd > it appears the JobCoordinator fails, but it never returns from > waitForFinish. Stack trace and continuation of log is below: > > 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR o.a.s.zk.Sche= duleAfterDebounceTime > - Execution of action: OnProcessorChange failed. > java.lang.IllegalArgumentException: Your container count (4) is larger > than your task count (2). Can't have containers with nothing to do, so > aborting. > at org.apache.samza.container.grouper.task.GroupByContainerCount. > validateTasks(GroupByContainerCount.java:212) > at org.apache.samza.container.grouper.task. > GroupByContainerCount.group(GroupByContainerCount.java:62) > at org.apache.samza.container.grouper.task.TaskNameGrouper. > group(TaskNameGrouper.java:56) > at org.apache.samza.coordinator.JobModelManager$.readJobModel( > JobModelManager.scala:266) > at org.apache.samza.coordinator.JobModelManager.readJobModel( > JobModelManager.scala) > at org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel( > ZkJobCoordinator.java:306) > at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange( > ZkJobCoordinator.java:197) > at org.apache.samza.zk.ZkJobCoordinator$LeaderElectorListenerImpl. > lambda$onBecomingLeader$0(ZkJobCoordinator.java:318) > at org.apache.samza.zk.ScheduleAfterDebounceTime. > lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134) > at java.util.concurrent.Executors$RunnableAdapter. > call$$$capture(Executors.java:511) > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java) > at java.util.concurrent.FutureTask.run$$$capture( > FutureTask.java:266) > at java.util.concurrent.FutureTask.run(FutureTask.java) > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG > o.a.samza.processor.StreamProcessor - Container is not instantiated yet. > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG > org.I0Itec.zkclient.ZkClient - Closing ZkClient... > 2018-03-15 22:34:32 logback 77789 [ZkClient-EventThread-15-10.0.127.114:2= 181] > INFO org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event thread= . > > And then the application continues on with metric reporters, and other > debug logging (not actually running the task though) > > Thanks in advance for the guidance, this has been easier than I imagined! > I=E2=80=99ll report back when I get more of the Dockerization/Kubernetes = running > and test it a bit more. > Cheers, > Thunder > > > From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com] > Sent: Thursday, March 15, 2018 14:46 > To: Thunder Stumpges > Cc: dev@samza.apache.org; tom@recursivedream.com; Yipan@linkedin.com; Yi > Pan > Subject: Re: Old style "low level" Tasks with alternative deployment > model(s) > > >> Thanks for the info on the tradeoffs. That makes a lot of sense. I am > on-board with using ZkJobCoordinator, sounds like some good benefits over > just the Kafka high-level consumer. > > This certainly looks like the simplest alternative. > > For your other questions, please find my answers inline. > > >> Q1: If I use LocalApplicationRunner, It does not use > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? > > Your understanding is correct. It directly instantiates the > StreamProcessor, which in-turn creates and runs the SamzaContainer. > > >> Q2: If I use LocalApplicationRunner, I will need to code myself the > loading and rewriting of the Config that is currently handled by JobRunne= r, > correct? > > I don't think you'll need to do this. IIUC, the LocalApplicationRunner > should automatically invoke rewriters and do the right thing. > > >> Q3: Do I need to also handle coordinator stream(s) and storing of > config that is done in JobRunner (I don=E2=80=99t think so as the ? > > I don't think this is necessary either. The creation of coordinator strea= m > and persisting configuration happens in the LocalApplicationRunner (more > specifically in StreamManager#createStreams). > > >> Q4: Where/How do I specify the Container ID for each instance? Is ther= e > a config setting that I can pass, (or pull from an env variable and add t= o > the config) ? I am assuming it is my responsibility to ensure that each > instance is started with a unique container ID..? > > Nope, If you are using the ZkJobCoordinator, you need not have to worry > about assigning IDs for each instance. The framework will automatically > take care of generating IDs and reaching consensus by electing a leader. = If > you are curious please take a look at implementations of the > ProcessorIdGenerator interface. > > Please let us know should you have further questions! > > Best, > Jagdish > > On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges > wrote: > > Thanks for the info on the tradeoffs. That makes a lot of sense. I am > on-board with using ZkJobCoordinator, sounds like some good benefits over > just the Kafka high-level consumer. > > > > To that end, I have made some notes on possible approaches based on the > previous thread, and from my look into the code. I=E2=80=99d love to get = feedback. > > > > Approach 1. Configure jobs to use =E2=80=9CProcessJobFactory=E2=80=9D and= run instances of > the job using run-job.sh or using JobRunner directly. > > I don=E2=80=99t think this makes sense from what I can see for a few reas= ons: > > * JobRunner is concerned with stuff I don't *think* we need: > > * coordinatorSystemProducer|Consumer, > * writing/reading the configuration to the coordinator streams > > * ProcessJobFactory hard-codes the ID to =E2=80=9C0=E2=80=9D so I don= =E2=80=99t think that > will work for multiple instances. > > > > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and invoke > LocalApplicationRunner.runTask() > > > > Q1: If I use LocalApplicationRunner, It does not use > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? > > Q2: If I use LocalApplicationRunner, I will need to code myself the > loading and rewriting of the Config that is currently handled by JobRunne= r, > correct? > > Q3: Do I need to also handle coordinator stream(s) and storing of > config that is done in JobRunner (I don=E2=80=99t think so as the ? > > Q4: Where/How do I specify the Container ID for each instance? Is > there a config setting that I can pass, (or pull from an env variable and > add to the config) ? I am assuming it is my responsibility to ensure that > each instance is started with a unique container ID..? > > I am getting started on the above (Approach 2.), and looking closer at th= e > code so I may have my own answers to my questions, but figured I should g= o > ahead and ask now anyway. Thanks! > > -Thunder > > > From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com jagadish1989@gmail.com>] > Sent: Thursday, March 15, 2018 1:41 > To: dev@samza.apache.org; Thunder Stumpges < > tstumpges@ntent.com>; tom@recursivedream.com > > Cc: Yipan@linkedin.com; Yi Pan < > nickpan47@gmail.com> > > Subject: Re: Old style "low level" Tasks with alternative deployment > model(s) > > >> You are correct that this is focused on the higher-level API but > >> doesn't > preclude using the lower-level API. I was at the same point you were not > long ago, in fact, and had a very productive conversation on the list > > Thanks Tom for linking the thread, and I'm glad that you were able to get > Kubernetes integration working with Samza. > > >> If it is helpful for everyone, once I get the low-level API + > >> ZkJobCoordinator + Docker + > K8s working, I'd be glad to formulate an additional sample for hello-samz= a. > > @Thunder Stumpges: > We'd be thrilled to receive your contribution. Examples, demos, tutorials > etc. > contribute a great deal to improving the ease of use of Apache Samza. I'm > happy to shepherd design discussions/code-reviews in the open-source > including answering any questions you may have. > > > >> One thing I'm still curious about, is what are the drawbacks or > >> complexities of leveraging the Kafka High-level consumer + > >> PassthroughJobCoordinator in a stand-alone setup like this? We do > >> have Zookeeper (because of kafka) so I think either would work. The > >> Kafka High-level consumer comes with other nice tools for monitoring > >> offsets, lag, etc > > > @Thunder Stumpges: > > Samza uses a "Job-Coordinator" to assign your input-partitions among the > different instances of your application s.t. they don't overlap. A typica= l > way to solve this "partition distribution" > problem is to have a single instance elected as a "leader" and have the > leader assign partitions to the group. > The ZkJobCoordinator uses Zk primitives to achieve this, while the YarnJC > relies on Yarn's guarantee that there will be a singleton-AppMaster to > achieve this. > > A key difference that separates the PassthroughJC from the Yarn/Zk > variants is that it does _not_ attempt to solve the "partition > distribution" problem. As a result, there's no leader-election involved. > Instead, it pushes the problem of "partition distribution" to the > underlying consumer. > > The PassThroughJc supports these 2 scenarios: > > 1. Consumer-managed partition distribution: When using the Kafka > high-level consumer (or an AWS KinesisClientLibrary consumer) with Samza, > the consumer manages partitions internally. > > 2. Static partition distribution: Alternately, partitions can be managed > statically using configuration. You can achieve static partition assignme= nt > by implementing a custom SystemStreamPartitionGrouper ttps://samza.apache.org/learn/documentation/0.8/api/ > javadocs/org/apache/samza/container/grouper/stream/ > SystemStreamPartitionGrouper.html> and TaskNameGrouper github.com/apache/samza/blob/master/samza-core/src/main/ > java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>. > Solutions in this category will typically require you to distinguish the > various processors in the group by providing an "id" for each. > Once the "id"s are decided, you can then statically compute assignments > using a function (eg: modulo N). > You can rely on the following mechanisms to provide this id: > - Configure each instance differently to have its own id > - Obtain the id from the cluster-manager. For instance, Kubernetes will > provide each POD an unique id in the range [0,N). AWS ECS should expose > similar capabilities via a REST end-point. > > >> One thing I'm still curious about, is what are the drawbacks or > complexities of leveraging the Kafka High-level consumer + > PassthroughJobCoordinator in a stand-alone setup like this? > > Leveraging the Kafka High-level consumer: > > The Kafka high-level consumer is not integrated into Samza just yet. > Instead, Samza's integration with Kafka uses the low-level consumer becau= se > i) It allows for greater control in fetching data from individual brokers= . > It is simple and performant in-terms of the threading model to have > one-thread pull from each broker. > ii) It is efficient in memory utilization since it does not do > internal-buffering of messages. > iii) There's no overhead like Kafka-controller heart-beats that are drive= n > by consumer.poll > > Since there's no built-in integration, you will have to build a new > SystemConsumer if you need to integrate with the Kafka High-level consume= r. > Further, there's more a fair bit of complexity to manage in checkpointing= . > > >> The Kafka High-level consumer comes with other nice tools for > >> monitoring offsets, lag, etc > > Samza exposes samza-kafka/src/main/scala/org/apache/samza/system/kafka/ > KafkaSystemConsumerMetrics.scala> the below metrics for lag-monitoring: > - The current log-end offset for each partition > - The last check-pointed offset for each partition > - The number of messages behind the highwatermark of the partition > > Please let us know if you need help discovering these or integrating thes= e > with other systems/tools. > > > Leveraging the Passthrough JobCoordinator: > > It's helpful to split this discussion on tradeoffs with PassthroughJC int= o > 2 parts: > > 1. PassthroughJC + consumer managed partitions: > > - In this model, Samza has no control over partition-assignment since it'= s > managed by the consumer. This means that stateful operations like joins > that rely on partitions being co-located on the same task will not work. > Simple stateless operations (eg: map, filter, remote lookups) are fine. > > - A key differentiator between Samza and other frameworks is our support > for "host affinity yarn/yarn-host-affinity.html>". Samza achieves this by assigning > partitions to hosts taking data-locality into account. If the consumer ca= n > arbitrarily shuffle partitions, it'd be hard to support this > affinity/locality. Often this is a key optimization when dealing with lar= ge > stateful jobs. > > 2. PassthroughJC + static partitions: > > - In this model, it is possible to make stateful processing (including > host affinity) work by carefully choosing how "id"s are assigned and > computed. > > Recommendation: > > - Owing to the above subtleties, I would recommend that we give the > ZkJobCoordinator + the built-in low-level Kafka integration a try. > - If we hit snags down this path, we can certainly explore the approach > with PassthroughJC + static partitions. > - Using the PassthroughJC + consumer-managed distribution would be least > preferable owing to the subtleties I outlined above. > > Please let us know should you have more questions. > > Best, > Jagdish > > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges > wrote: > Wow, what great timing, and what a great thread! I definitely have some > good starters to go off of here. > > If it is helpful for everyone, once I get the low-level API + > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate an > additional sample for hello-samza. > > One thing I'm still curious about, is what are the drawbacks or > complexities of leveraging the Kafka High-level consumer + > PassthroughJobCoordinator in a stand-alone setup like this? We do have > Zookeeper (because of kafka) so I think either would work. The Kafka > High-level consumer comes with other nice tools for monitoring offsets, > lag, etc.... > > Thanks guys! > -Thunder > > -----Original Message----- > From: Tom Davis [mailto:tom@recursivedream.com tom@recursivedream.com>] > Sent: Wednesday, March 14, 2018 17:50 > To: dev@samza.apache.org > Subject: Re: Old style "low level" Tasks with alternative deployment > model(s) > > Hey there! > > You are correct that this is focused on the higher-level API but doesn't > preclude using the lower-level API. I was at the same point you were not > long ago, in fact, and had a very productive conversation on the list: > you should look for "Question about custom StreamJob/Factory" in the list > archive for the past couple months. > > I'll quote Jagadish Venkatraman from that thread: > > > For the section on the low-level API, can you use > > LocalApplicationRunner#runTask()? It basically creates a new > > StreamProcessor and runs it. Remember to provide task.class and set it > > to your implementation of StreamTask or AsyncStreamTask. Please note > > that this is an evolving API and hence, subject to change. > > I ended up just switching to the high-level API because I don't have any > existing Tasks and the Kubernetes story is a little more straight forward > there (there's only one container/configuration to deploy). > > Best, > > Tom > > Thunder Stumpges > writes= : > > > Hi all, > > > > We are using Samza (0.12.0) in about 2 dozen jobs implementing several > > processing pipelines. We have also begun a significant move of other > > services within our company to Docker/Kubernetes. Right now our > > Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce" jobs > (many reporting and other batch processing jobs). We would really like to > move our stream processing off of Hadoop/Yarn and onto Kubernetes. > > > > When I just read about some of the new progress in .13 and .14 I got > > really excited! We would love to have our jobs run as simple libraries > > in our own JVM, and use the Kafka High-Level-Consumer for partition > distribution and such. This would let us "dockerfy" our application and > run/scale in kubernetes. > > > > However as I read it, this new deployment model is ONLY for the > > new(er) High Level API, correct? Is there a plan and/or resources for > > adapting this back to existing low-level tasks ? How complicated of a > task is that? Do I have any other options to make this transition easier? > > > > Thanks in advance. > > Thunder > > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University > > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University > --001a1142f9946b4abb05679070c4--