From dev-return-8867-archive-asf-public=cust-asf.ponee.io@samza.apache.org Tue Mar 20 19:17:25 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5B38018064A for ; Tue, 20 Mar 2018 19:17:24 +0100 (CET) Received: (qmail 26319 invoked by uid 500); 20 Mar 2018 18:17:23 -0000 Mailing-List: contact dev-help@samza.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@samza.apache.org Delivered-To: mailing list dev@samza.apache.org Received: (qmail 26307 invoked by uid 99); 20 Mar 2018 18:17:22 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Mar 2018 18:17:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E8DE5C0752 for ; Tue, 20 Mar 2018 18:17:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.131 X-Spam-Level: ** X-Spam-Status: No, score=2.131 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, HTML_OBFUSCATE_05_10=0.001, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, WEIRD_PORT=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id MxsFIiRUtJ8n for ; Tue, 20 Mar 2018 18:17:15 +0000 (UTC) Received: from mail-qt0-f171.google.com (mail-qt0-f171.google.com [209.85.216.171]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 082B45F478 for ; Tue, 20 Mar 2018 18:17:15 +0000 (UTC) Received: by mail-qt0-f171.google.com with SMTP id s48so2609765qtb.10 for ; Tue, 20 Mar 2018 11:17:14 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=m794kPZkdlWYQZFl0pMfCTjrlQLzYYVA2jD2l1WHUSg=; b=H88gImgvjN90VAkrHdmwAgj0t3iqbCwT25jBlCIcHMMw8wdec90FnZEWweeTuQ2DX4 j/XP5qcMuQ68wxfmjXCPsqW41y4IPdG+smiGP6cr1YPixqTEoVJ8qFhPiQvVRFacKlyT 5z0vgI/Zngck2GrMWByAEMMzVvT7j81p+bFA0aXoYf/PaB37J2Ou1cT8cN+j5DVekoS+ tC26mZL38+qOgFH+Q/lgkDPWJQsg4jj9kbqkZVe9xqE7YTcXOrtsqxoCJmx+DY3P6Asw LwPUlrJEUny01A7h8fsYO1rZLBAUbzX/C5BWCWsCW4JIl40kDHxVBo7VupHvKCUdmqJU UATA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=m794kPZkdlWYQZFl0pMfCTjrlQLzYYVA2jD2l1WHUSg=; b=irNHSrasZaQdCWo8f/1tjpek6q+KQLYhaqg39D3gcpXkJ0rklCHidmz+JHmfAE/wZB 5DfxUVAvO1Ltc0dh9uckbMt647TV3OHApRQBvBfBJe4CZ1YY7PwGL+jYRpHsF22XzDB0 H676x+kgWwZj93kMeuVhYKSid7dUgGMSh7r7DPXNXPIOqVAGB9crRTOOZ5MXYsp0Bwla ch5BvFNlucqpY1G23ASmy7Yi7tSyl+mN44bJ4dAozFTw3cySIMP3BO8FUWGurfHwFLx5 wGkS2QhX6KRlO3s3md/gsgOr8Pqt/jLSnQXzsTIVNFZLIjm2vyy7bvfH24Yq/7HncgLU loDw== X-Gm-Message-State: AElRT7HFvaH097ssvHSS7H7OsdLq4nMX0V7C/TWfXIo1CGt5Q2vTjoxZ OJELoyGqwhP+j2y0ZopTyUdnGZN9pgTtIWFETXY= X-Google-Smtp-Source: AG47ELuZT3vk2o09Pr9hZ+UxYLOywgjLX1maoDtxJpAWOuZ0BeP/ZdG1HsS4SB/NTq2WbFmgmWK/okEFdOYo93Oftoo= X-Received: by 10.200.1.195 with SMTP id b3mr26067688qtg.49.1521569833630; Tue, 20 Mar 2018 11:17:13 -0700 (PDT) MIME-Version: 1.0 Received: by 10.200.48.183 with HTTP; Tue, 20 Mar 2018 11:17:12 -0700 (PDT) In-Reply-To: References: <87d106qi1n.fsf@recursivedream.com> From: Jagadish Venkatraman Date: Tue, 20 Mar 2018 11:17:12 -0700 Message-ID: Subject: Re: Old style "low level" Tasks with alternative deployment model(s) To: Prateek Maheshwari Cc: dev@samza.apache.org, "tom@recursivedream.com" , "Yipan@linkedin.com" , Yi Pan Content-Type: multipart/alternative; boundary="f403045e80626f41070567dc15aa" --f403045e80626f41070567dc15aa Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Thunder, Thank you for the PR. Really nice work! Since, you have a working implementation on K8s, would you be willing to contribute a short tutorial / a post on this? We'll be sure to feature it in the official Samza web-site at http://samza.apache.org/. It'd be a great addition to the Samza community to have a section on K8s integration! There have been multiple prior asks on this, and your learnings would be super-helpful. Best, Jagdish On Tue, Mar 20, 2018 at 10:46 AM, Prateek Maheshwari wrote: > Glad you were able to figure it out, that was very confusing. Thanks for > the fix too. > > - Prateek > > On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges > wrote: > >> And that last issue was mine. My setting override was not picked up and >> it was using GroupByContainerCount instead. >> -Thanks, >> Thunder >> >> >> -----Original Message----- >> From: Thunder Stumpges >> Sent: Monday, March 19, 2018 20:58 >> To: dev@samza.apache.org >> Cc: Jagadish Venkatraman ; tom@recursivedream.co= m; >> Yipan@linkedin.com; Yi Pan >> Subject: RE: Old style "low level" Tasks with alternative deployment >> model(s) >> >> Well I figured it out. My specific issue was due to a simple dependency >> problem where I had gotten an older version of the Jackson-mapper librar= y. >> However the code was throwing NoSuchMethodError (an Error instead of >> Exception) and being silently dropped. I created a pull request to handl= e >> any Throwable in ScheduleAfterDebounceTime. >> https://github.com/apache/samza/pull/450 >> >> I'm now running into an issue with the generation of the JobModel and th= e >> ProcessorId. The ZkJobCoordinator has a ProcessorId that is a Guid, but >> when GroupByContainerIds class (my TaskNameGrouper) creates the >> ContainerModels, it is using the ContainerId (a numeric value, 0,1,2,etc= ) >> as the ProcessorId (~ line 105). This results in the JobModel that is >> generated and published immediately causing the processor to quit with t= his >> message: >> >> INFO o.apache.samza.zk.ZkJobCoordinator - New JobModel does not contain >> pid=3D38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this processor. >> >> I was assuming I should be using GroupByContainerIds as my >> TaskNameGrouper. I don't see any other promising implementations. Am I j= ust >> missing something? >> >> Thanks, >> Thunder >> >> JobModel >> { >> "config" : { >> ... >> }, >> "containers" : { >> "0" : { >> "tasks" : { >> "Partition 0" : { >> "task-name" : "Partition 0", >> "system-stream-partitions" : [ { >> "system" : "kafka", >> "partition" : 0, >> "stream" : "test_topic1" >> }, { >> "system" : "kafka", >> "partition" : 0, >> "stream" : "test_topic2" >> } ], >> "changelog-partition" : 0 >> }, >> "Partition 1" : { >> "task-name" : "Partition 1", >> "system-stream-partitions" : [ { >> "system" : "kafka", >> "partition" : 1, >> "stream" : "test_topic1" >> }, { >> "system" : "kafka", >> "partition" : 1, >> "stream" : "test_topic2" >> } ], >> "changelog-partition" : 1 >> } >> }, >> "container-id" : 0, >> "processor-id" : "0" >> } >> }, >> "max-change-log-stream-partitions" : 2, >> "all-container-locality" : { >> "0" : null >> } >> } >> >> -----Original Message----- >> From: Thunder Stumpges [mailto:tstumpges@ntent.com] >> Sent: Friday, March 16, 2018 18:21 >> To: dev@samza.apache.org >> Cc: Jagadish Venkatraman ; tom@recursivedream.co= m; >> Yipan@linkedin.com; Yi Pan >> Subject: RE: Old style "low level" Tasks with alternative deployment >> model(s) >> >> Attached. I don't see any threads actually running this code which is od= d. >> >> There's my main thread that's waiting for the whole thing to finish, the >> "debounce-thread-0" (which logged the other surrounding messages below) = has >> this: >> >> "debounce-thread-0" #18 daemon prio=3D5 os_prio=3D0 tid=3D0x00007fa0fd71= 9800 >> nid=3D0x21 waiting on condition [0x00007fa0d0d45000] >> java.lang.Thread.State: WAITING (parking) >> at sun.misc.Unsafe.park(Native Method) >> - parking to wait for <0x00000006f166e350> (a >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >> at java.util.concurrent.locks.LockSupport.park(LockSupport.java >> :175) >> at java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit >> ionObject.await(AbstractQueuedSynchronizer.java:2039) >> at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork >> Queue.take(ScheduledThreadPoolExecutor.java:1081) >> at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork >> Queue.take(ScheduledThreadPoolExecutor.java:809) >> at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolEx >> ecutor.java:1067) >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1127) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Locked ownable synchronizers: >> - None >> >> Thanks for having a look. >> Thunder >> >> >> -----Original Message----- >> From: Prateek Maheshwari [mailto:prateekmi2@gmail.com] >> Sent: Friday, March 16, 2018 17:02 >> To: dev@samza.apache.org >> Cc: Jagadish Venkatraman ; tom@recursivedream.co= m; >> Yipan@linkedin.com; Yi Pan >> Subject: Re: Old style "low level" Tasks with alternative deployment >> model(s) >> >> Hi Thunder, >> >> Can you please take and attach a thread dump with this? >> >> Thanks, >> Prateek >> >> On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges >> wrote: >> >> > It appears it IS hung while serializing the JobModel... very strange! >> > I added some debug statements around the calls: >> > >> > LOG.debug("Getting object mapper to serialize job model"); // >> > this IS printed >> > ObjectMapper mmapper =3D SamzaObjectMapper.getObjectMapper(); >> > LOG.debug("Serializing job model"); // this IS printed >> > String jobModelStr =3D mmapper.writerWithDefaultPrettyPrinter >> > ().writeValueAsString(jobModel); >> > LOG.info("jobModelAsString=3D" + jobModelStr); // this is NOT >> printed! >> > >> > Another thing I noticed is that "getObjectMapper" actually creates the >> > object mapper twice! >> > >> > 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG >> > org.apache.samza.zk.ZkUtils - Getting object mapper to serialize job >> > model >> > 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Creating new object mapper and simple module >> > 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Adding SerDes and mixins >> > 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Adding custom ContainerModel deserializer >> > 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Setting up naming strategy and registering module >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Done! >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Creating new object mapper and simple module >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Adding SerDes and mixins >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Adding custom ContainerModel deserializer >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Setting up naming strategy and registering module >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > o.a.s.s.model.SamzaObjectMapper >> > - Done! >> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >> > org.apache.samza.zk.ZkUtils - Serializing job model >> > >> > Could this ObjectMapper be a singleton? I see there is a private >> > static instance, but getObjectMapper creates a new one every time... >> > >> > Anyway, then it takes off to serialize the job model and never comes >> > back... >> > >> > Hoping someone has some idea here... the implementation for this >> > mostly comes from Jackson-mapper-asl, and I have the version that is >> > linked in the >> > 0.14.0 tag: >> > | | | +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13 >> > | | | | \--- org.codehaus.jackson:jackson-core-asl:1.9.13 >> > >> > Thanks! >> > Thunder >> > >> > -----Original Message----- >> > From: Thunder Stumpges [mailto:tstumpges@ntent.com] >> > Sent: Friday, March 16, 2018 15:29 >> > To: dev@samza.apache.org; Jagadish Venkatraman >> > >> > Cc: tom@recursivedream.com; Yipan@linkedin.com; Yi Pan < >> > nickpan47@gmail.com> >> > Subject: RE: Old style "low level" Tasks with alternative deployment >> > model(s) >> > >> > So, my investigation starts at StreamProcessor.java. Line 294 in >> > method >> > onNewJobModel() logs an INFO message that it's starting a container. >> > This message never appears. >> > >> > I see that ZkJobCoordinator calls onNewJobModel from its >> > onNewJobModelConfirmed method which also logs an info message stating >> > "version X of the job model got confirmed". I never see this message >> > either, so I go up the chain some more. >> > >> > I DO see: >> > >> > 2018-03-16 21:43:58 logback 20498 >> > [ZkClient-EventThread-13-10.0.127.114:2181] >> > INFO o.apache.samza.zk.ZkJobCoordinator - >> > ZkJobCoordinator::onBecomeLeader >> > - I became the leader! >> > And >> > 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO >> > o.apache.samza.zk.ZkJobCoordinator - >> > pid=3D91e07d20-ae33-4156-a5f3-534a95642133Generated >> > new Job Model. Version =3D 1 >> > >> > Which led me to method onDoProcessorChange line 210. I see that line, >> > but not the line below " Published new Job Model. Version =3D" so >> > something in here is not completing: >> > >> > LOG.info("pid=3D" + processorId + "Generated new Job Model. Versio= n =3D >> " >> > + nextJMVersion); >> > >> > // Publish the new job model >> > zkUtils.publishJobModel(nextJMVersion, jobModel); >> > >> > // Start the barrier for the job model update >> > barrier.create(nextJMVersion, currentProcessorIds); >> > >> > // Notify all processors about the new JobModel by updating >> > JobModel Version number >> > zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion); >> > >> > LOG.info("pid=3D" + processorId + "Published new Job Model. Versio= n =3D >> " >> > + nextJMVersion); >> > >> > As I mentioned, after the line "Generated new Job Model. Version =3D 1= " >> > I just get repeated zk ping responses.. no more application logging. >> > >> > The very next thing that's run is zkUtils.publishJobModel() which only >> > has two lines before another log statement (which I don't see): >> > >> > public void publishJobModel(String jobModelVersion, JobModel >> jobModel) { >> > try { >> > ObjectMapper mmapper =3D SamzaObjectMapper.getObjectMapper(); >> > String jobModelStr =3D mmapper.writerWithDefaultPrettyPrinter >> > ().writeValueAsString(jobModel); >> > LOG.info("jobModelAsString=3D" + jobModelStr); >> > ... >> > >> > Could it really be getting hung up on one of these two lines? (seems >> > like it must be, but I don't see anything there that seems like it >> > would just hang). I'll keep troubleshooting, maybe add some more debug >> > logging and try again. >> > >> > Thanks for any guidance you all might have. >> > -Thunder >> > >> > >> > -----Original Message----- >> > From: Thunder Stumpges [mailto:tstumpges@ntent.com] >> > Sent: Friday, March 16, 2018 14:43 >> > To: dev@samza.apache.org; Jagadish Venkatraman >> > >> > Cc: tom@recursivedream.com; Yipan@linkedin.com; Yi Pan < >> > nickpan47@gmail.com> >> > Subject: RE: Old style "low level" Tasks with alternative deployment >> > model(s) >> > >> > Well I have my stand-alone application in docker and running in >> > kubernetes. I think something isn't wired up all the way though, >> > because my task never actually gets invoked. I see no errors, however >> > I'm not getting the usual startup logs (checking existing offsets, >> > "entering run loop"...) My logs look like this: >> > >> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO >> > kafka.utils.VerifiableProperties >> > - Verifying properties >> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO >> > kafka.utils.VerifiableProperties >> > - Property client.id is overridden to samza_admin-test_stream_task-1 >> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO >> > kafka.utils.VerifiableProperties >> > - Property metadata.broker.list is overridden to >> > test-kafka-kafka.test-svc:9092 >> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO >> > kafka.utils.VerifiableProperties >> > - Property request.timeout.ms is overridden to 30000 >> > 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO >> > kafka.client.ClientUtils$ - Fetching metadata from broker >> > BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation id 0 >> > for 1 topic(s) Set(dev_k8s.samza.test.topic) >> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG >> > kafka.network.BlockingChannel - Created socket with SO_TIMEOUT =3D 300= 00 >> > (requested 30000), SO_RCVBUF =3D 179680 (requested -1), SO_SNDBUF =3D >> > 102400 (requested 102400), connectTimeoutMs =3D 30000. >> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO >> > kafka.producer.SyncProducer - Connected to >> > test-kafka-kafka.test-svc:9092 for producing >> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO >> > kafka.producer.SyncProducer - Disconnecting from >> > test-kafka-kafka.test-svc:9092 >> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG >> > kafka.client.ClientUtils$ - Successfully fetched metadata for 1 >> > topic(s) >> > Set(dev_k8s.samza.test.topic) >> > 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper >> > org.apache.samza.container.grouper.stream.GroupByPartition@1a7158cc >> > has grouped the SystemStreamPartitions into 10 tasks with the >> > following >> > taskNames: [Partition 1, Partition 0, Partition 3, Partition 2, >> > Partition 5, Partition 4, Partition 7, Partition 6, Partition 9, >> > Partition 8] >> > 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 0 is being >> > assigned changelog partition 0. >> > 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 1 is being >> > assigned changelog partition 1. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 2 is being >> > assigned changelog partition 2. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 3 is being >> > assigned changelog partition 3. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 4 is being >> > assigned changelog partition 4. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 5 is being >> > assigned changelog partition 5. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 6 is being >> > assigned changelog partition 6. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 7 is being >> > assigned changelog partition 7. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 8 is being >> > assigned changelog partition 8. >> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >> > o.a.s.coordinator.JobModelManager$ - New task Partition 9 is being >> > assigned changelog partition 9. >> > 2018-03-16 21:05:55 logback 50838 [main-SendThread(10.0.127.114:2181)] >> > DEBUG org.apache.zookeeper.ClientCnxn - Reading reply >> > sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null serverPath:null >> > finished:false header:: 23,4 replyHeader:: 23,14024,0 request:: >> > '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/ >> > JobModelGeneration/jobModelVersion,T response:: >> > ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878} >> > 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO >> > o.apache.samza.zk.ZkJobCoordinator - >> > pid=3Da14a0434-a238-4ff6-935b-c78d906fe80dGenerated >> > new Job Model. Version =3D 1 >> > 2018-03-16 21:06:05 logback 60848 [main-SendThread(10.0.127.114:2181)] >> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for >> sessionid: >> > 0x1622c8b5fc01ac7 after 2ms >> > 2018-03-16 21:06:15 logback 70856 [main-SendThread(10.0.127.114:2181)] >> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for >> sessionid: >> > 0x1622c8b5fc01ac7 after 1ms >> > 2018-03-16 21:06:25 logback 80865 [main-SendThread(10.0.127.114:2181)] >> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for >> sessionid: >> > 0x1622c8b5fc01ac7 after 2ms ... >> > >> > The zk ping responses continue every 10 seconds, but no other activity >> > or messages occur. >> > It looks like it gets as far as confirming the JobModel and grouping >> > the partitions, but nothing actually starts up. >> > >> > Any ideas? >> > Thanks in advance! >> > Thunder >> > >> > >> > -----Original Message----- >> > From: Thunder Stumpges [mailto:tstumpges@ntent.com] >> > Sent: Thursday, March 15, 2018 16:35 >> > To: Jagadish Venkatraman >> > Cc: dev@samza.apache.org; tom@recursivedream.com; Yipan@linkedin.com; >> > Yi Pan >> > Subject: RE: Old style "low level" Tasks with alternative deployment >> > model(s) >> > >> > Thanks a lot for the info. I have something basically working at this >> > point! I have not integrated it with Docker nor Kubernetes yet, but it >> > does run from my local machine. >> > >> > I have determined that LocalApplicationRunner does NOT do config >> > rewriting. I had to write my own little =E2=80=9CStandAloneApplication= Runner=E2=80=9D >> > that handles the =E2=80=9Cmain=E2=80=9D entrypoint. It does command pa= rsing using >> > CommandLine, load config from ConfigFactory, and perform rewriting >> > before creating the new instance of LocalApplicationRunner. This is >> > all my StandAloneApplicationRunner contains: >> > >> > >> > object StandAloneSamzaRunner extends App with LazyLogging { >> > >> > // parse command line args just like JobRunner. >> > val cmdline =3D new ApplicationRunnerCommandLine >> > val options =3D cmdline.parser.parse(args: _*) >> > val config =3D cmdline.loadConfig(options) >> > >> > val runner =3D new LocalApplicationRunner(Util.rewriteConfig(config)= ) >> > runner.runTask() >> > runner.waitForFinish() >> > } >> > >> > The only config settings I needed to make to use this runner were >> > (easily configured due to our central Consul config system and our >> rewriter) : >> > >> > # use the ZK based job coordinator >> > job.coordinator.factory=3Dorg.apache.samza.zk.ZkJobCoordinatorFactory >> > # need to use GroupByContainerIds instead of GroupByContainerCount >> > task.name.grouper.factory=3Dorg.apache.samza.container.grouper.task. >> > GroupByContainerIdsFactory >> > # ZKJC config >> > job.coordinator.zk.connect=3D >> > >> > I did run into one potential problem; as you see above, I have started >> > the task using runTask() and then to prevent my main method from >> > returning, I have called waitForFinish(). The first time I ran it, the >> > job itself failed because I had forgotten to override the task >> > grouper, and container count was pulled from our staging environment. >> > There are some failures logged and it appears the JobCoordinator >> > fails, but it never returns from waitForFinish. Stack trace and >> continuation of log is below: >> > >> > 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR >> > o.a.s.zk.ScheduleAfterDebounceTime >> > - Execution of action: OnProcessorChange failed. >> > java.lang.IllegalArgumentException: Your container count (4) is larger >> > than your task count (2). Can't have containers with nothing to do, so >> > aborting. >> > at org.apache.samza.container.grouper.task.GroupByContainerCoun >> t. >> > validateTasks(GroupByContainerCount.java:212) >> > at org.apache.samza.container.grouper.task. >> > GroupByContainerCount.group(GroupByContainerCount.java:62) >> > at org.apache.samza.container.grouper.task.TaskNameGrouper. >> > group(TaskNameGrouper.java:56) >> > at org.apache.samza.coordinator.JobModelManager$.readJobModel( >> > JobModelManager.scala:266) >> > at org.apache.samza.coordinator.JobModelManager.readJobModel( >> > JobModelManager.scala) >> > at org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel( >> > ZkJobCoordinator.java:306) >> > at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange( >> > ZkJobCoordinator.java:197) >> > at org.apache.samza.zk.ZkJobCoordinator$LeaderElectorListenerIm >> pl. >> > lambda$onBecomingLeader$0(ZkJobCoordinator.java:318) >> > at org.apache.samza.zk.ScheduleAfterDebounceTime. >> > lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134) >> > at java.util.concurrent.Executors$RunnableAdapter. >> > call$$$capture(Executors.java:511) >> > at java.util.concurrent.Executors$RunnableAdapter. >> > call(Executors.java) >> > at java.util.concurrent.FutureTask.run$$$capture( >> > FutureTask.java:266) >> > at java.util.concurrent.FutureTask.run(FutureTask.java) >> > at java.util.concurrent.ScheduledThreadPoolExecutor$ >> > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> > at java.util.concurrent.ScheduledThreadPoolExecutor$ >> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> > at java.util.concurrent.ThreadPoolExecutor.runWorker( >> > ThreadPoolExecutor.java:1142) >> > at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> > ThreadPoolExecutor.java:617) >> > at java.lang.Thread.run(Thread.java:745) >> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG >> > o.a.samza.processor.StreamProcessor - Container is not instantiated >> yet. >> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG >> > org.I0Itec.zkclient.ZkClient - Closing ZkClient... >> > 2018-03-15 22:34:32 logback 77789 >> > [ZkClient-EventThread-15-10.0.127.114:2181] >> > INFO org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event >> thread. >> > >> > And then the application continues on with metric reporters, and other >> > debug logging (not actually running the task though) >> > >> > Thanks in advance for the guidance, this has been easier than I >> imagined! >> > I=E2=80=99ll report back when I get more of the Dockerization/Kubernet= es >> > running and test it a bit more. >> > Cheers, >> > Thunder >> > >> > >> > From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com] >> > Sent: Thursday, March 15, 2018 14:46 >> > To: Thunder Stumpges >> > Cc: dev@samza.apache.org; tom@recursivedream.com; Yipan@linkedin.com; >> > Yi Pan >> > Subject: Re: Old style "low level" Tasks with alternative deployment >> > model(s) >> > >> > >> Thanks for the info on the tradeoffs. That makes a lot of sense. I >> > >> am >> > on-board with using ZkJobCoordinator, sounds like some good benefits >> > over just the Kafka high-level consumer. >> > >> > This certainly looks like the simplest alternative. >> > >> > For your other questions, please find my answers inline. >> > >> > >> Q1: If I use LocalApplicationRunner, It does not use >> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? >> > >> > Your understanding is correct. It directly instantiates the >> > StreamProcessor, which in-turn creates and runs the SamzaContainer. >> > >> > >> Q2: If I use LocalApplicationRunner, I will need to code myself the >> > loading and rewriting of the Config that is currently handled by >> > JobRunner, correct? >> > >> > I don't think you'll need to do this. IIUC, the LocalApplicationRunner >> > should automatically invoke rewriters and do the right thing. >> > >> > >> Q3: Do I need to also handle coordinator stream(s) and storing of >> > config that is done in JobRunner (I don=E2=80=99t think so as the ? >> > >> > I don't think this is necessary either. The creation of coordinator >> > stream and persisting configuration happens in the >> > LocalApplicationRunner (more specifically in >> StreamManager#createStreams). >> > >> > >> Q4: Where/How do I specify the Container ID for each instance? Is >> > >> there >> > a config setting that I can pass, (or pull from an env variable and >> > add to the config) ? I am assuming it is my responsibility to ensure >> > that each instance is started with a unique container ID..? >> > >> > Nope, If you are using the ZkJobCoordinator, you need not have to >> > worry about assigning IDs for each instance. The framework will >> > automatically take care of generating IDs and reaching consensus by >> > electing a leader. If you are curious please take a look at >> > implementations of the ProcessorIdGenerator interface. >> > >> > Please let us know should you have further questions! >> > >> > Best, >> > Jagdish >> > >> > On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges >> > > wrote: >> > >> > Thanks for the info on the tradeoffs. That makes a lot of sense. I am >> > on-board with using ZkJobCoordinator, sounds like some good benefits >> > over just the Kafka high-level consumer. >> > >> > >> > >> > To that end, I have made some notes on possible approaches based on >> > the previous thread, and from my look into the code. I=E2=80=99d love = to get >> feedback. >> > >> > >> > >> > Approach 1. Configure jobs to use =E2=80=9CProcessJobFactory=E2=80=9D = and run >> > instances of the job using run-job.sh or using JobRunner directly. >> > >> > I don=E2=80=99t think this makes sense from what I can see for a few r= easons: >> > >> > * JobRunner is concerned with stuff I don't *think* we need: >> > >> > * coordinatorSystemProducer|Consumer, >> > * writing/reading the configuration to the coordinator streams >> > >> > * ProcessJobFactory hard-codes the ID to =E2=80=9C0=E2=80=9D so I = don=E2=80=99t think that >> > will work for multiple instances. >> > >> > >> > >> > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and >> > invoke >> > LocalApplicationRunner.runTask() >> > >> > >> > >> > Q1: If I use LocalApplicationRunner, It does not use >> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? >> > >> > Q2: If I use LocalApplicationRunner, I will need to code myself >> > the loading and rewriting of the Config that is currently handled by >> > JobRunner, correct? >> > >> > Q3: Do I need to also handle coordinator stream(s) and storing of >> > config that is done in JobRunner (I don=E2=80=99t think so as the ? >> > >> > Q4: Where/How do I specify the Container ID for each instance? Is >> > there a config setting that I can pass, (or pull from an env variable >> > and add to the config) ? I am assuming it is my responsibility to >> > ensure that each instance is started with a unique container ID..? >> > >> > I am getting started on the above (Approach 2.), and looking closer at >> > the code so I may have my own answers to my questions, but figured I >> > should go ahead and ask now anyway. Thanks! >> > >> > -Thunder >> > >> > >> > From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com> > jagadish1989@gmail.com>] >> > Sent: Thursday, March 15, 2018 1:41 >> > To: dev@samza.apache.org; Thunder >> > Stumpges < tstumpges@ntent.com>; >> > tom@recursivedream.com >> > Cc: Yipan@linkedin.com; Yi Pan < >> > nickpan47@gmail.com> >> > >> > Subject: Re: Old style "low level" Tasks with alternative deployment >> > model(s) >> > >> > >> You are correct that this is focused on the higher-level API but >> > >> doesn't >> > preclude using the lower-level API. I was at the same point you were >> > not long ago, in fact, and had a very productive conversation on the >> > list >> > >> > Thanks Tom for linking the thread, and I'm glad that you were able to >> > get Kubernetes integration working with Samza. >> > >> > >> If it is helpful for everyone, once I get the low-level API + >> > >> ZkJobCoordinator + Docker + >> > K8s working, I'd be glad to formulate an additional sample for >> hello-samza. >> > >> > @Thunder Stumpges: >> > We'd be thrilled to receive your contribution. Examples, demos, >> > tutorials etc. >> > contribute a great deal to improving the ease of use of Apache Samza. >> > I'm happy to shepherd design discussions/code-reviews in the >> > open-source including answering any questions you may have. >> > >> > >> > >> One thing I'm still curious about, is what are the drawbacks or >> > >> complexities of leveraging the Kafka High-level consumer + >> > >> PassthroughJobCoordinator in a stand-alone setup like this? We do >> > >> have Zookeeper (because of kafka) so I think either would work. The >> > >> Kafka High-level consumer comes with other nice tools for >> > >> monitoring offsets, lag, etc >> > >> > >> > @Thunder Stumpges: >> > >> > Samza uses a "Job-Coordinator" to assign your input-partitions among >> > the different instances of your application s.t. they don't overlap. A >> > typical way to solve this "partition distribution" >> > problem is to have a single instance elected as a "leader" and have >> > the leader assign partitions to the group. >> > The ZkJobCoordinator uses Zk primitives to achieve this, while the >> > YarnJC relies on Yarn's guarantee that there will be a >> > singleton-AppMaster to achieve this. >> > >> > A key difference that separates the PassthroughJC from the Yarn/Zk >> > variants is that it does _not_ attempt to solve the "partition >> > distribution" problem. As a result, there's no leader-election involve= d. >> > Instead, it pushes the problem of "partition distribution" to the >> > underlying consumer. >> > >> > The PassThroughJc supports these 2 scenarios: >> > >> > 1. Consumer-managed partition distribution: When using the Kafka >> > high-level consumer (or an AWS KinesisClientLibrary consumer) with >> > Samza, the consumer manages partitions internally. >> > >> > 2. Static partition distribution: Alternately, partitions can be >> > managed statically using configuration. You can achieve static >> > partition assignment by implementing a custom >> > SystemStreamPartitionGrouper> > ttps://samza.apache.org/learn/documentation/0.8/api/ >> > javadocs/org/apache/samza/container/grouper/stream/ >> > SystemStreamPartitionGrouper.html> and TaskNameGrouper> > github.com/apache/samza/blob/master/samza-core/src/main/ >> > java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>. >> > Solutions in this category will typically require you to distinguish >> > the various processors in the group by providing an "id" for each. >> > Once the "id"s are decided, you can then statically compute >> > assignments using a function (eg: modulo N). >> > You can rely on the following mechanisms to provide this id: >> > - Configure each instance differently to have its own id >> > - Obtain the id from the cluster-manager. For instance, Kubernetes >> > will provide each POD an unique id in the range [0,N). AWS ECS should >> > expose similar capabilities via a REST end-point. >> > >> > >> One thing I'm still curious about, is what are the drawbacks or >> > complexities of leveraging the Kafka High-level consumer + >> > PassthroughJobCoordinator in a stand-alone setup like this? >> > >> > Leveraging the Kafka High-level consumer: >> > >> > The Kafka high-level consumer is not integrated into Samza just yet. >> > Instead, Samza's integration with Kafka uses the low-level consumer >> > because >> > i) It allows for greater control in fetching data from individual >> brokers. >> > It is simple and performant in-terms of the threading model to have >> > one-thread pull from each broker. >> > ii) It is efficient in memory utilization since it does not do >> > internal-buffering of messages. >> > iii) There's no overhead like Kafka-controller heart-beats that are >> > driven by consumer.poll >> > >> > Since there's no built-in integration, you will have to build a new >> > SystemConsumer if you need to integrate with the Kafka High-level >> consumer. >> > Further, there's more a fair bit of complexity to manage in >> checkpointing. >> > >> > >> The Kafka High-level consumer comes with other nice tools for >> > >> monitoring offsets, lag, etc >> > >> > Samza exposes> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/ >> > KafkaSystemConsumerMetrics.scala> the below metrics for lag-monitoring= : >> > - The current log-end offset for each partition >> > - The last check-pointed offset for each partition >> > - The number of messages behind the highwatermark of the partition >> > >> > Please let us know if you need help discovering these or integrating >> > these with other systems/tools. >> > >> > >> > Leveraging the Passthrough JobCoordinator: >> > >> > It's helpful to split this discussion on tradeoffs with PassthroughJC >> > into >> > 2 parts: >> > >> > 1. PassthroughJC + consumer managed partitions: >> > >> > - In this model, Samza has no control over partition-assignment since >> > it's managed by the consumer. This means that stateful operations like >> > joins that rely on partitions being co-located on the same task will >> not work. >> > Simple stateless operations (eg: map, filter, remote lookups) are fine= . >> > >> > - A key differentiator between Samza and other frameworks is our >> > support for "host >> > affinity> > yarn/yarn-host-affinity.html>". Samza achieves this by assigning >> > partitions to hosts taking data-locality into account. If the consumer >> > can arbitrarily shuffle partitions, it'd be hard to support this >> > affinity/locality. Often this is a key optimization when dealing with >> > large stateful jobs. >> > >> > 2. PassthroughJC + static partitions: >> > >> > - In this model, it is possible to make stateful processing (including >> > host affinity) work by carefully choosing how "id"s are assigned and >> > computed. >> > >> > Recommendation: >> > >> > - Owing to the above subtleties, I would recommend that we give the >> > ZkJobCoordinator + the built-in low-level Kafka integration a try. >> > - If we hit snags down this path, we can certainly explore the >> > approach with PassthroughJC + static partitions. >> > - Using the PassthroughJC + consumer-managed distribution would be >> > least preferable owing to the subtleties I outlined above. >> > >> > Please let us know should you have more questions. >> > >> > Best, >> > Jagdish >> > >> > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges > > > wrote: >> > Wow, what great timing, and what a great thread! I definitely have >> > some good starters to go off of here. >> > >> > If it is helpful for everyone, once I get the low-level API + >> > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate an >> > additional sample for hello-samza. >> > >> > One thing I'm still curious about, is what are the drawbacks or >> > complexities of leveraging the Kafka High-level consumer + >> > PassthroughJobCoordinator in a stand-alone setup like this? We do have >> > Zookeeper (because of kafka) so I think either would work. The Kafka >> > High-level consumer comes with other nice tools for monitoring >> > offsets, lag, etc.... >> > >> > Thanks guys! >> > -Thunder >> > >> > -----Original Message----- >> > From: Tom Davis [mailto:tom@recursivedream.com> > tom@recursivedream.com>] >> > Sent: Wednesday, March 14, 2018 17:50 >> > To: dev@samza.apache.org >> > Subject: Re: Old style "low level" Tasks with alternative deployment >> > model(s) >> > >> > Hey there! >> > >> > You are correct that this is focused on the higher-level API but >> > doesn't preclude using the lower-level API. I was at the same point >> > you were not long ago, in fact, and had a very productive conversation >> on the list: >> > you should look for "Question about custom StreamJob/Factory" in the >> > list archive for the past couple months. >> > >> > I'll quote Jagadish Venkatraman from that thread: >> > >> > > For the section on the low-level API, can you use >> > > LocalApplicationRunner#runTask()? It basically creates a new >> > > StreamProcessor and runs it. Remember to provide task.class and set >> > > it to your implementation of StreamTask or AsyncStreamTask. Please >> > > note that this is an evolving API and hence, subject to change. >> > >> > I ended up just switching to the high-level API because I don't have >> > any existing Tasks and the Kubernetes story is a little more straight >> > forward there (there's only one container/configuration to deploy). >> > >> > Best, >> > >> > Tom >> > >> > Thunder Stumpges > >> writes: >> > >> > > Hi all, >> > > >> > > We are using Samza (0.12.0) in about 2 dozen jobs implementing >> > > several processing pipelines. We have also begun a significant move >> > > of other services within our company to Docker/Kubernetes. Right now >> > > our Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce" >> > > jobs >> > (many reporting and other batch processing jobs). We would really like >> > to move our stream processing off of Hadoop/Yarn and onto Kubernetes. >> > > >> > > When I just read about some of the new progress in .13 and .14 I got >> > > really excited! We would love to have our jobs run as simple >> > > libraries in our own JVM, and use the Kafka High-Level-Consumer for >> > > partition >> > distribution and such. This would let us "dockerfy" our application >> > and run/scale in kubernetes. >> > > >> > > However as I read it, this new deployment model is ONLY for the >> > > new(er) High Level API, correct? Is there a plan and/or resources >> > > for adapting this back to existing low-level tasks ? How complicated >> > > of a >> > task is that? Do I have any other options to make this transition >> easier? >> > > >> > > Thanks in advance. >> > > Thunder >> > >> > >> > >> > -- >> > Jagadish V, >> > Graduate Student, >> > Department of Computer Science, >> > Stanford University >> > >> > >> > >> > -- >> > Jagadish V, >> > Graduate Student, >> > Department of Computer Science, >> > Stanford University >> > >> > > --=20 Jagadish V, Graduate Student, Department of Computer Science, Stanford University --f403045e80626f41070567dc15aa--