samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tom Davis <...@recursivedream.com>
Subject Re: Old style "low level" Tasks with alternative deployment model(s)
Date Tue, 20 Mar 2018 18:24:06 GMT
What format would that be best in? Happy to put something together this
weekend as well.

Jagadish Venkatraman <jagadish1989@gmail.com> writes:

> Hi Thunder,
>
> Thank you for the PR. Really nice work!
>
> Since, you have a working implementation on K8s, would you be willing to
> contribute a short tutorial / a post on this? We'll be sure to feature it
> in the official Samza web-site at http://samza.apache.org/.
>
> It'd be a great addition to the Samza community to have a section on K8s
> integration! There have been multiple prior asks on this, and your
> learnings would be super-helpful.
>
> Best,
> Jagdish
>
>
>
> On Tue, Mar 20, 2018 at 10:46 AM, Prateek Maheshwari <prateekmi2@gmail.com>
> wrote:
>
>> Glad you were able to figure it out, that was very confusing. Thanks for
>> the fix too.
>>
>> - Prateek
>>
>> On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges <tstumpges@ntent.com>
>> wrote:
>>
>>> And that last issue was mine. My setting override was not picked up and
>>> it was using GroupByContainerCount instead.
>>> -Thanks,
>>> Thunder
>>>
>>>
>>> -----Original Message-----
>>> From: Thunder Stumpges
>>> Sent: Monday, March 19, 2018 20:58
>>> To: dev@samza.apache.org
>>> Cc: Jagadish Venkatraman <jagadish1989@gmail.com>; tom@recursivedream.com;
>>> Yipan@linkedin.com; Yi Pan <nickpan47@gmail.com>
>>> Subject: RE: Old style "low level" Tasks with alternative deployment
>>> model(s)
>>>
>>> Well I figured it out. My specific issue was due to a simple dependency
>>> problem where I had gotten an older version of the Jackson-mapper library.
>>> However the code was throwing NoSuchMethodError (an Error instead of
>>> Exception) and being silently dropped. I created a pull request to handle
>>> any Throwable in ScheduleAfterDebounceTime.
>>> https://github.com/apache/samza/pull/450
>>>
>>> I'm now running into an issue with the generation of the JobModel and the
>>> ProcessorId. The ZkJobCoordinator has a ProcessorId that is a Guid, but
>>> when GroupByContainerIds class (my TaskNameGrouper) creates the
>>> ContainerModels, it is using the ContainerId (a numeric value, 0,1,2,etc)
>>> as the ProcessorId (~ line 105). This results in the JobModel that is
>>> generated and published immediately causing the processor to quit with this
>>> message:
>>>
>>> INFO  o.apache.samza.zk.ZkJobCoordinator - New JobModel does not contain
>>> pid=38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this processor.
>>>
>>> I was assuming I should be using GroupByContainerIds as my
>>> TaskNameGrouper. I don't see any other promising implementations. Am I just
>>> missing something?
>>>
>>> Thanks,
>>> Thunder
>>>
>>> JobModel
>>> {
>>>   "config" : {
>>>   ...
>>>   },
>>>   "containers" : {
>>>     "0" : {
>>>       "tasks" : {
>>>         "Partition 0" : {
>>>           "task-name" : "Partition 0",
>>>           "system-stream-partitions" : [ {
>>>             "system" : "kafka",
>>>             "partition" : 0,
>>>             "stream" : "test_topic1"
>>>           }, {
>>>             "system" : "kafka",
>>>             "partition" : 0,
>>>             "stream" : "test_topic2"
>>>           } ],
>>>           "changelog-partition" : 0
>>>         },
>>>         "Partition 1" : {
>>>           "task-name" : "Partition 1",
>>>           "system-stream-partitions" : [ {
>>>             "system" : "kafka",
>>>             "partition" : 1,
>>>             "stream" : "test_topic1"
>>>           }, {
>>>             "system" : "kafka",
>>>             "partition" : 1,
>>>             "stream" : "test_topic2"
>>>           } ],
>>>           "changelog-partition" : 1
>>>         }
>>>       },
>>>       "container-id" : 0,
>>>       "processor-id" : "0"
>>>     }
>>>   },
>>>   "max-change-log-stream-partitions" : 2,
>>>   "all-container-locality" : {
>>>     "0" : null
>>>   }
>>> }
>>>
>>> -----Original Message-----
>>> From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>> Sent: Friday, March 16, 2018 18:21
>>> To: dev@samza.apache.org
>>> Cc: Jagadish Venkatraman <jagadish1989@gmail.com>; tom@recursivedream.com;
>>> Yipan@linkedin.com; Yi Pan <nickpan47@gmail.com>
>>> Subject: RE: Old style "low level" Tasks with alternative deployment
>>> model(s)
>>>
>>> Attached. I don't see any threads actually running this code which is odd.
>>>
>>> There's my main thread that's waiting for the whole thing to finish, the
>>> "debounce-thread-0" (which logged the other surrounding messages below) has
>>> this:
>>>
>>> "debounce-thread-0" #18 daemon prio=5 os_prio=0 tid=0x00007fa0fd719800
>>> nid=0x21 waiting on condition [0x00007fa0d0d45000]
>>>    java.lang.Thread.State: WAITING (parking)
>>>         at sun.misc.Unsafe.park(Native Method)
>>>         - parking to wait for  <0x00000006f166e350> (a
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java
>>> :175)
>>>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit
>>> ionObject.await(AbstractQueuedSynchronizer.java:2039)
>>>         at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork
>>> Queue.take(ScheduledThreadPoolExecutor.java:1081)
>>>         at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork
>>> Queue.take(ScheduledThreadPoolExecutor.java:809)
>>>         at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolEx
>>> ecutor.java:1067)
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1127)
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>>    Locked ownable synchronizers:
>>>         - None
>>>
>>> Thanks for having a look.
>>> Thunder
>>>
>>>
>>> -----Original Message-----
>>> From: Prateek Maheshwari [mailto:prateekmi2@gmail.com]
>>> Sent: Friday, March 16, 2018 17:02
>>> To: dev@samza.apache.org
>>> Cc: Jagadish Venkatraman <jagadish1989@gmail.com>; tom@recursivedream.com;
>>> Yipan@linkedin.com; Yi Pan <nickpan47@gmail.com>
>>> Subject: Re: Old style "low level" Tasks with alternative deployment
>>> model(s)
>>>
>>> Hi Thunder,
>>>
>>> Can you please take and attach a thread dump with this?
>>>
>>> Thanks,
>>> Prateek
>>>
>>> On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges <tstumpges@ntent.com>
>>> wrote:
>>>
>>> > It appears it IS hung while serializing the JobModel... very strange!
>>> > I added some debug statements around the calls:
>>> >
>>> >       LOG.debug("Getting object mapper to serialize job model");  //
>>> > this IS printed
>>> >       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>>> >       LOG.debug("Serializing job model"); // this IS printed
>>> >       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
>>> > ().writeValueAsString(jobModel);
>>> >       LOG.info("jobModelAsString=" + jobModelStr); // this is NOT
>>> printed!
>>> >
>>> > Another thing I noticed is that "getObjectMapper" actually creates the
>>> > object mapper twice!
>>> >
>>> > 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG
>>> > org.apache.samza.zk.ZkUtils - Getting object mapper to serialize job
>>> > model
>>> > 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG
>>> > o.a.s.s.model.SamzaObjectMapper
>>> > - Creating new object mapper and simple module
>>> > 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG
>>> > o.a.s.s.model.SamzaObjectMapper
>>> > - Adding SerDes and mixins
>>> > 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG
>>> > o.a.s.s.model.SamzaObjectMapper
>>> > - Adding custom ContainerModel deserializer
>>> > 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG
>>> > o.a.s.s.model.SamzaObjectMapper
>>> > - Setting up naming strategy and registering module
>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>> > o.a.s.s.model.SamzaObjectMapper
>>> > - Done!
>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>> > o.a.s.s.model.SamzaObjectMapper
>>> > - Creating new object mapper and simple module
>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>> > o.a.s.s.model.SamzaObjectMapper
>>> > - Adding SerDes  and mixins
>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>> > o.a.s.s.model.SamzaObjectMapper
>>> > - Adding custom ContainerModel deserializer
>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>> > o.a.s.s.model.SamzaObjectMapper
>>> > - Setting up naming strategy and registering module
>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>> > o.a.s.s.model.SamzaObjectMapper
>>> > - Done!
>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>> > org.apache.samza.zk.ZkUtils - Serializing job model
>>> >
>>> > Could this ObjectMapper be a singleton? I see there is a private
>>> > static instance, but getObjectMapper creates a new one every time...
>>> >
>>> > Anyway, then it takes off to serialize the job model and never comes
>>> > back...
>>> >
>>> > Hoping someone has some idea here... the implementation for this
>>> > mostly comes from Jackson-mapper-asl, and I have the version that is
>>> > linked in the
>>> > 0.14.0 tag:
>>> > |    |    |    +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
>>> > |    |    |    |    \--- org.codehaus.jackson:jackson-core-asl:1.9.13
>>> >
>>> > Thanks!
>>> > Thunder
>>> >
>>> > -----Original Message-----
>>> > From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>> > Sent: Friday, March 16, 2018 15:29
>>> > To: dev@samza.apache.org; Jagadish Venkatraman
>>> > <jagadish1989@gmail.com>
>>> > Cc: tom@recursivedream.com; Yipan@linkedin.com; Yi Pan <
>>> > nickpan47@gmail.com>
>>> > Subject: RE: Old style "low level" Tasks with alternative deployment
>>> > model(s)
>>> >
>>> > So, my investigation starts at StreamProcessor.java.  Line 294 in
>>> > method
>>> > onNewJobModel() logs an INFO message that it's starting a container.
>>> > This message never appears.
>>> >
>>> > I see that ZkJobCoordinator calls onNewJobModel from its
>>> > onNewJobModelConfirmed method which also logs an info message stating
>>> > "version X of the job model got confirmed". I never see this message
>>> > either, so I go up the chain some more.
>>> >
>>> > I DO see:
>>> >
>>> > 2018-03-16 21:43:58 logback 20498
>>> > [ZkClient-EventThread-13-10.0.127.114:2181]
>>> > INFO  o.apache.samza.zk.ZkJobCoordinator -
>>> > ZkJobCoordinator::onBecomeLeader
>>> > - I became the leader!
>>> > And
>>> > 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO
>>> > o.apache.samza.zk.ZkJobCoordinator -
>>> > pid=91e07d20-ae33-4156-a5f3-534a95642133Generated
>>> > new Job Model. Version = 1
>>> >
>>> > Which led me to method onDoProcessorChange line 210. I see that line,
>>> > but not the line below " Published new Job Model. Version =" so
>>> > something in here is not completing:
>>> >
>>> >     LOG.info("pid=" + processorId + "Generated new Job Model. Version =
>>> "
>>> > + nextJMVersion);
>>> >
>>> >     // Publish the new job model
>>> >     zkUtils.publishJobModel(nextJMVersion, jobModel);
>>> >
>>> >     // Start the barrier for the job model update
>>> >     barrier.create(nextJMVersion, currentProcessorIds);
>>> >
>>> >     // Notify all processors about the new JobModel by updating
>>> > JobModel Version number
>>> >     zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
>>> >
>>> >     LOG.info("pid=" + processorId + "Published new Job Model. Version =
>>> "
>>> > + nextJMVersion);
>>> >
>>> > As I mentioned, after the line "Generated new Job Model. Version = 1"
>>> > I just get repeated zk ping responses.. no more application logging.
>>> >
>>> > The very next thing that's run is zkUtils.publishJobModel() which only
>>> > has two lines before another log statement (which I don't see):
>>> >
>>> >   public void publishJobModel(String jobModelVersion, JobModel
>>> jobModel) {
>>> >     try {
>>> >       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>>> >       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
>>> > ().writeValueAsString(jobModel);
>>> >       LOG.info("jobModelAsString=" + jobModelStr);
>>> >       ...
>>> >
>>> > Could it really be getting hung up on one of these two lines? (seems
>>> > like it must be, but I don't see anything there that seems like it
>>> > would just hang). I'll keep troubleshooting, maybe add some more debug
>>> > logging and try again.
>>> >
>>> > Thanks for any guidance you all might have.
>>> > -Thunder
>>> >
>>> >
>>> > -----Original Message-----
>>> > From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>> > Sent: Friday, March 16, 2018 14:43
>>> > To: dev@samza.apache.org; Jagadish Venkatraman
>>> > <jagadish1989@gmail.com>
>>> > Cc: tom@recursivedream.com; Yipan@linkedin.com; Yi Pan <
>>> > nickpan47@gmail.com>
>>> > Subject: RE: Old style "low level" Tasks with alternative deployment
>>> > model(s)
>>> >
>>> > Well I have my stand-alone application in docker and running in
>>> > kubernetes. I think something isn't wired up all the way though,
>>> > because my task never actually gets invoked. I see no errors, however
>>> > I'm not getting the usual startup logs (checking existing offsets,
>>> > "entering run loop"...) My logs look like this:
>>> >
>>> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
>>> > kafka.utils.VerifiableProperties
>>> > - Verifying properties
>>> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
>>> > kafka.utils.VerifiableProperties
>>> > - Property client.id is overridden to samza_admin-test_stream_task-1
>>> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
>>> > kafka.utils.VerifiableProperties
>>> > - Property metadata.broker.list is overridden to
>>> > test-kafka-kafka.test-svc:9092
>>> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
>>> > kafka.utils.VerifiableProperties
>>> > - Property request.timeout.ms is overridden to 30000
>>> > 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO
>>> > kafka.client.ClientUtils$ - Fetching metadata from broker
>>> > BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation id 0
>>> > for 1 topic(s) Set(dev_k8s.samza.test.topic)
>>> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG
>>> > kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = 30000
>>> > (requested 30000), SO_RCVBUF = 179680 (requested -1), SO_SNDBUF =
>>> > 102400 (requested 102400), connectTimeoutMs = 30000.
>>> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO
>>> > kafka.producer.SyncProducer - Connected to
>>> > test-kafka-kafka.test-svc:9092 for producing
>>> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO
>>> > kafka.producer.SyncProducer - Disconnecting from
>>> > test-kafka-kafka.test-svc:9092
>>> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG
>>> > kafka.client.ClientUtils$ - Successfully fetched metadata for 1
>>> > topic(s)
>>> > Set(dev_k8s.samza.test.topic)
>>> > 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper
>>> > org.apache.samza.container.grouper.stream.GroupByPartition@1a7158cc
>>> > has grouped the SystemStreamPartitions into 10 tasks with the
>>> > following
>>> > taskNames: [Partition 1, Partition 0, Partition 3, Partition 2,
>>> > Partition 5, Partition 4, Partition 7, Partition 6, Partition 9,
>>> > Partition 8]
>>> > 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 0 is being
>>> > assigned changelog partition 0.
>>> > 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 1 is being
>>> > assigned changelog partition 1.
>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 2 is being
>>> > assigned changelog partition 2.
>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 3 is being
>>> > assigned changelog partition 3.
>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 4 is being
>>> > assigned changelog partition 4.
>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 5 is being
>>> > assigned changelog partition 5.
>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 6 is being
>>> > assigned changelog partition 6.
>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 7 is being
>>> > assigned changelog partition 7.
>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 8 is being
>>> > assigned changelog partition 8.
>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 9 is being
>>> > assigned changelog partition 9.
>>> > 2018-03-16 21:05:55 logback 50838 [main-SendThread(10.0.127.114:2181)]
>>> > DEBUG org.apache.zookeeper.ClientCnxn - Reading reply
>>> > sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null serverPath:null
>>> > finished:false header:: 23,4  replyHeader:: 23,14024,0  request::
>>> > '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/
>>> > JobModelGeneration/jobModelVersion,T  response::
>>> > ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878}
>>> > 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO
>>> > o.apache.samza.zk.ZkJobCoordinator -
>>> > pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated
>>> > new Job Model. Version = 1
>>> > 2018-03-16 21:06:05 logback 60848 [main-SendThread(10.0.127.114:2181)]
>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>>> sessionid:
>>> > 0x1622c8b5fc01ac7 after 2ms
>>> > 2018-03-16 21:06:15 logback 70856 [main-SendThread(10.0.127.114:2181)]
>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>>> sessionid:
>>> > 0x1622c8b5fc01ac7 after 1ms
>>> > 2018-03-16 21:06:25 logback 80865 [main-SendThread(10.0.127.114:2181)]
>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>>> sessionid:
>>> > 0x1622c8b5fc01ac7 after 2ms ...
>>> >
>>> > The zk ping responses continue every 10 seconds, but no other activity
>>> > or messages occur.
>>> > It looks like it gets as far as confirming the JobModel and grouping
>>> > the partitions, but nothing actually starts up.
>>> >
>>> > Any ideas?
>>> > Thanks in advance!
>>> > Thunder
>>> >
>>> >
>>> > -----Original Message-----
>>> > From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>> > Sent: Thursday, March 15, 2018 16:35
>>> > To: Jagadish Venkatraman <jagadish1989@gmail.com>
>>> > Cc: dev@samza.apache.org; tom@recursivedream.com; Yipan@linkedin.com;
>>> > Yi Pan <nickpan47@gmail.com>
>>> > Subject: RE: Old style "low level" Tasks with alternative deployment
>>> > model(s)
>>> >
>>> > Thanks a lot for the info. I have something basically working at this
>>> > point! I have not integrated it with Docker nor Kubernetes yet, but it
>>> > does run from my local machine.
>>> >
>>> > I have determined that LocalApplicationRunner does NOT do config
>>> > rewriting. I had to write my own little “StandAloneApplicationRunner”
>>> > that handles the “main” entrypoint. It does command parsing using
>>> > CommandLine, load config from ConfigFactory, and perform rewriting
>>> > before creating the new instance of LocalApplicationRunner. This is
>>> > all my StandAloneApplicationRunner contains:
>>> >
>>> >
>>> > object StandAloneSamzaRunner extends App with LazyLogging {
>>> >
>>> >   // parse command line args just like JobRunner.
>>> >   val cmdline = new ApplicationRunnerCommandLine
>>> >   val options = cmdline.parser.parse(args: _*)
>>> >   val config = cmdline.loadConfig(options)
>>> >
>>> >   val runner = new LocalApplicationRunner(Util.rewriteConfig(config))
>>> >   runner.runTask()
>>> >   runner.waitForFinish()
>>> > }
>>> >
>>> > The only config settings I needed to make to use this runner were
>>> > (easily configured due to our central Consul config system and our
>>> rewriter) :
>>> >
>>> > # use the ZK based job coordinator
>>> > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
>>> > # need to use GroupByContainerIds instead of GroupByContainerCount
>>> > task.name.grouper.factory=org.apache.samza.container.grouper.task.
>>> > GroupByContainerIdsFactory
>>> > # ZKJC config
>>> > job.coordinator.zk.connect=<our_zk_connection>
>>> >
>>> > I did run into one potential problem; as you see above, I have started
>>> > the task using runTask() and then to prevent my main method from
>>> > returning, I have called waitForFinish(). The first time I ran it, the
>>> > job itself failed because I had forgotten to override the task
>>> > grouper, and container count was pulled from our staging environment.
>>> > There are some failures logged and it appears the JobCoordinator
>>> > fails, but it never returns from waitForFinish. Stack trace and
>>> continuation of log is below:
>>> >
>>> > 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR
>>> > o.a.s.zk.ScheduleAfterDebounceTime
>>> > - Execution of action: OnProcessorChange failed.
>>> > java.lang.IllegalArgumentException: Your container count (4) is larger
>>> > than your task count (2). Can't have containers with nothing to do, so
>>> > aborting.
>>> >        at org.apache.samza.container.grouper.task.GroupByContainerCoun
>>> t.
>>> > validateTasks(GroupByContainerCount.java:212)
>>> >        at org.apache.samza.container.grouper.task.
>>> > GroupByContainerCount.group(GroupByContainerCount.java:62)
>>> >        at org.apache.samza.container.grouper.task.TaskNameGrouper.
>>> > group(TaskNameGrouper.java:56)
>>> >        at org.apache.samza.coordinator.JobModelManager$.readJobModel(
>>> > JobModelManager.scala:266)
>>> >        at org.apache.samza.coordinator.JobModelManager.readJobModel(
>>> > JobModelManager.scala)
>>> >        at org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel(
>>> > ZkJobCoordinator.java:306)
>>> >        at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(
>>> > ZkJobCoordinator.java:197)
>>> >        at org.apache.samza.zk.ZkJobCoordinator$LeaderElectorListenerIm
>>> pl.
>>> > lambda$onBecomingLeader$0(ZkJobCoordinator.java:318)
>>> >        at org.apache.samza.zk.ScheduleAfterDebounceTime.
>>> > lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134)
>>> >        at java.util.concurrent.Executors$RunnableAdapter.
>>> > call$$$capture(Executors.java:511)
>>> >        at java.util.concurrent.Executors$RunnableAdapter.
>>> > call(Executors.java)
>>> >        at java.util.concurrent.FutureTask.run$$$capture(
>>> > FutureTask.java:266)
>>> >        at java.util.concurrent.FutureTask.run(FutureTask.java)
>>> >        at java.util.concurrent.ScheduledThreadPoolExecutor$
>>> > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> >        at java.util.concurrent.ScheduledThreadPoolExecutor$
>>> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> >        at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> > ThreadPoolExecutor.java:1142)
>>> >        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> > ThreadPoolExecutor.java:617)
>>> >        at java.lang.Thread.run(Thread.java:745)
>>> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
>>> > o.a.samza.processor.StreamProcessor - Container is not instantiated
>>> yet.
>>> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
>>> > org.I0Itec.zkclient.ZkClient - Closing ZkClient...
>>> > 2018-03-15 22:34:32 logback 77789
>>> > [ZkClient-EventThread-15-10.0.127.114:2181]
>>> > INFO  org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event
>>> thread.
>>> >
>>> > And then the application continues on with metric reporters, and other
>>> > debug logging (not actually running the task though)
>>> >
>>> > Thanks in advance for the guidance, this has been easier than I
>>> imagined!
>>> > I’ll report back when I get more of the Dockerization/Kubernetes
>>> > running and test it a bit more.
>>> > Cheers,
>>> > Thunder
>>> >
>>> >
>>> > From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com]
>>> > Sent: Thursday, March 15, 2018 14:46
>>> > To: Thunder Stumpges <tstumpges@ntent.com>
>>> > Cc: dev@samza.apache.org; tom@recursivedream.com; Yipan@linkedin.com;
>>> > Yi Pan <nickpan47@gmail.com>
>>> > Subject: Re: Old style "low level" Tasks with alternative deployment
>>> > model(s)
>>> >
>>> > >>  Thanks for the info on the tradeoffs. That makes a lot of sense. I
>>> > >> am
>>> > on-board with using ZkJobCoordinator, sounds like some good benefits
>>> > over just the Kafka high-level consumer.
>>> >
>>> > This certainly looks like the simplest alternative.
>>> >
>>> > For your other questions, please find my answers inline.
>>> >
>>> > >> Q1: If I use LocalApplicationRunner, It does not use
>>> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>>> >
>>> > Your understanding is correct. It directly instantiates the
>>> > StreamProcessor, which in-turn creates and runs the SamzaContainer.
>>> >
>>> > >> Q2: If I use LocalApplicationRunner, I will need to code myself the
>>> > loading and rewriting of the Config that is currently handled by
>>> > JobRunner, correct?
>>> >
>>> > I don't think you'll need to do this. IIUC, the LocalApplicationRunner
>>> > should automatically invoke rewriters and do the right thing.
>>> >
>>> > >>  Q3: Do I need to also handle coordinator stream(s) and storing of
>>> > config that is done in JobRunner (I don’t think so as the ?
>>> >
>>> > I don't think this is necessary either. The creation of coordinator
>>> > stream and persisting configuration happens in the
>>> > LocalApplicationRunner (more specifically in
>>> StreamManager#createStreams).
>>> >
>>> > >> Q4: Where/How do I specify the Container ID for each instance? Is
>>> > >> there
>>> > a config setting that I can pass, (or pull from an env variable and
>>> > add to the config) ? I am assuming it is my responsibility to ensure
>>> > that each instance is started with a unique container ID..?
>>> >
>>> > Nope, If you are using the ZkJobCoordinator, you need not have to
>>> > worry about assigning IDs for each instance. The framework will
>>> > automatically take care of generating IDs and reaching consensus by
>>> > electing a leader. If you are curious please take a look at
>>> > implementations of the ProcessorIdGenerator interface.
>>> >
>>> > Please let us know should you have further questions!
>>> >
>>> > Best,
>>> > Jagdish
>>> >
>>> > On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges
>>> > <tstumpges@ntent.com <mailto:tstumpges@ntent.com>> wrote:
>>> >
>>> > Thanks for the info on the tradeoffs. That makes a lot of sense. I am
>>> > on-board with using ZkJobCoordinator, sounds like some good benefits
>>> > over just the Kafka high-level consumer.
>>> >
>>> >
>>> >
>>> > To that end, I have made some notes on possible approaches based on
>>> > the previous thread, and from my look into the code. I’d love to get
>>> feedback.
>>> >
>>> >
>>> >
>>> > Approach 1. Configure jobs to use “ProcessJobFactory” and run
>>> > instances of the job using run-job.sh or using JobRunner directly.
>>> >
>>> > I don’t think this makes sense from what I can see for a few reasons:
>>> >
>>> >   *   JobRunner is concerned with stuff I don't *think* we need:
>>> >
>>> >      *   coordinatorSystemProducer|Consumer,
>>> >      *   writing/reading the configuration to the coordinator streams
>>> >
>>> >   *   ProcessJobFactory hard-codes the ID to “0” so I don’t think that
>>> > will work for multiple instances.
>>> >
>>> >
>>> >
>>> > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and
>>> > invoke
>>> > LocalApplicationRunner.runTask()
>>> >
>>> >
>>> >
>>> >     Q1: If I use LocalApplicationRunner, It does not use
>>> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>>> >
>>> >     Q2: If I use LocalApplicationRunner, I will need to code myself
>>> > the loading and rewriting of the Config that is currently handled by
>>> > JobRunner, correct?
>>> >
>>> >     Q3: Do I need to also handle coordinator stream(s) and storing of
>>> > config that is done in JobRunner (I don’t think so as the ?
>>> >
>>> >     Q4: Where/How do I specify the Container ID for each instance? Is
>>> > there a config setting that I can pass, (or pull from an env variable
>>> > and add to the config) ? I am assuming it is my responsibility to
>>> > ensure that each instance is started with a unique container ID..?
>>> >
>>> > I am getting started on the above (Approach 2.), and looking closer at
>>> > the code so I may have my own answers to my questions, but figured I
>>> > should go ahead and ask now anyway. Thanks!
>>> >
>>> > -Thunder
>>> >
>>> >
>>> > From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com<mailto:
>>> > jagadish1989@gmail.com>]
>>> > Sent: Thursday, March 15, 2018 1:41
>>> > To: dev@samza.apache.org<mailto:dev@samza.apache.org>; Thunder
>>> > Stumpges < tstumpges@ntent.com<mailto:tstumpges@ntent.com>>;
>>> > tom@recursivedream.com <mailto:tom@recursivedream.com>
>>> > Cc: Yipan@linkedin.com<mailto:Yipan@linkedin.com>; Yi Pan <
>>> > nickpan47@gmail.com<mailto:nickpan47@gmail.com>>
>>> >
>>> > Subject: Re: Old style "low level" Tasks with alternative deployment
>>> > model(s)
>>> >
>>> > >> You are correct that this is focused on the higher-level API but
>>> > >> doesn't
>>> > preclude using the lower-level API. I was at the same point you were
>>> > not long ago, in fact, and had a very productive conversation on the
>>> > list
>>> >
>>> > Thanks Tom for linking the thread, and I'm glad that you were able to
>>> > get Kubernetes integration working with Samza.
>>> >
>>> > >> If it is helpful for everyone, once I get the low-level API +
>>> > >> ZkJobCoordinator + Docker +
>>> > K8s working, I'd be glad to formulate an additional sample for
>>> hello-samza.
>>> >
>>> > @Thunder Stumpges:
>>> > We'd be thrilled to receive your contribution. Examples, demos,
>>> > tutorials etc.
>>> > contribute a great deal to improving the ease of use of Apache Samza.
>>> > I'm happy to shepherd design discussions/code-reviews in the
>>> > open-source including answering any questions you may have.
>>> >
>>> >
>>> > >> One thing I'm still curious about, is what are the drawbacks or
>>> > >> complexities of leveraging the Kafka High-level consumer +
>>> > >> PassthroughJobCoordinator in a stand-alone setup like this? We do
>>> > >> have Zookeeper (because of kafka) so I think either would work. The
>>> > >> Kafka High-level consumer comes with other nice tools for
>>> > >> monitoring offsets, lag, etc
>>> >
>>> >
>>> > @Thunder Stumpges:
>>> >
>>> > Samza uses a "Job-Coordinator" to assign your input-partitions among
>>> > the different instances of your application s.t. they don't overlap. A
>>> > typical way to solve this "partition distribution"
>>> > problem is to have a single instance elected as a "leader" and have
>>> > the leader assign partitions to the group.
>>> > The ZkJobCoordinator uses Zk primitives to achieve this, while the
>>> > YarnJC relies on Yarn's guarantee that there will be a
>>> > singleton-AppMaster to achieve this.
>>> >
>>> > A key difference that separates the PassthroughJC from the Yarn/Zk
>>> > variants is that it does _not_ attempt to solve the "partition
>>> > distribution" problem. As a result, there's no leader-election involved.
>>> > Instead, it pushes the problem of "partition distribution" to the
>>> > underlying consumer.
>>> >
>>> > The PassThroughJc supports these 2 scenarios:
>>> >
>>> > 1. Consumer-managed partition distribution: When using the Kafka
>>> > high-level consumer (or an AWS KinesisClientLibrary consumer) with
>>> > Samza, the consumer manages partitions internally.
>>> >
>>> > 2. Static partition distribution: Alternately, partitions can be
>>> > managed statically using configuration. You can achieve static
>>> > partition assignment by implementing a custom
>>> > SystemStreamPartitionGrouper<h
>>> > ttps://samza.apache.org/learn/documentation/0.8/api/
>>> > javadocs/org/apache/samza/container/grouper/stream/
>>> > SystemStreamPartitionGrouper.html> and TaskNameGrouper<https://
>>> > github.com/apache/samza/blob/master/samza-core/src/main/
>>> > java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>.
>>> > Solutions in this category will typically require you to distinguish
>>> > the various processors in the group by providing an "id" for each.
>>> > Once the "id"s are decided, you can then statically compute
>>> > assignments using a function (eg: modulo N).
>>> > You can rely on the following mechanisms to provide this id:
>>> >  - Configure each instance differently to have its own id
>>> >  - Obtain the id from the cluster-manager. For instance, Kubernetes
>>> > will provide each POD an unique id in the range [0,N). AWS ECS should
>>> > expose similar capabilities via a REST end-point.
>>> >
>>> > >> One thing I'm still curious about, is what are the drawbacks or
>>> > complexities of leveraging the Kafka High-level consumer +
>>> > PassthroughJobCoordinator in a stand-alone setup like this?
>>> >
>>> > Leveraging the Kafka High-level consumer:
>>> >
>>> > The Kafka high-level consumer is not integrated into Samza just yet.
>>> > Instead, Samza's integration with Kafka uses the low-level consumer
>>> > because
>>> > i) It allows for greater control in fetching data from individual
>>> brokers.
>>> > It is simple and performant in-terms of the threading model to have
>>> > one-thread pull from each broker.
>>> > ii) It is efficient in memory utilization since it does not do
>>> > internal-buffering of messages.
>>> > iii) There's no overhead like Kafka-controller heart-beats that are
>>> > driven by consumer.poll
>>> >
>>> > Since there's no built-in integration, you will have to build a new
>>> > SystemConsumer if you need to integrate with the Kafka High-level
>>> consumer.
>>> > Further, there's more a fair bit of complexity to manage in
>>> checkpointing.
>>> >
>>> > >> The Kafka High-level consumer comes with other nice tools for
>>> > >> monitoring offsets, lag, etc
>>> >
>>> > Samza exposes<https://github.com/apache/samza/blob/master/
>>> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/
>>> > KafkaSystemConsumerMetrics.scala> the below metrics for lag-monitoring:
>>> > - The current log-end offset for each partition
>>> > - The last check-pointed offset for each partition
>>> > - The number of messages behind the highwatermark of the partition
>>> >
>>> > Please let us know if you need help discovering these or integrating
>>> > these with other systems/tools.
>>> >
>>> >
>>> > Leveraging the Passthrough JobCoordinator:
>>> >
>>> > It's helpful to split this discussion on tradeoffs with PassthroughJC
>>> > into
>>> > 2 parts:
>>> >
>>> > 1. PassthroughJC + consumer managed partitions:
>>> >
>>> > - In this model, Samza has no control over partition-assignment since
>>> > it's managed by the consumer. This means that stateful operations like
>>> > joins that rely on partitions being co-located on the same task will
>>> not work.
>>> > Simple stateless operations (eg: map, filter, remote lookups) are fine.
>>> >
>>> > - A key differentiator between Samza and other frameworks is our
>>> > support for "host
>>> > affinity<https://samza.apache.org/learn/documentation/0.14/
>>> > yarn/yarn-host-affinity.html>". Samza achieves this by assigning
>>> > partitions to hosts taking data-locality into account. If the consumer
>>> > can arbitrarily shuffle partitions, it'd be hard to support this
>>> > affinity/locality. Often this is a key optimization when dealing with
>>> > large stateful jobs.
>>> >
>>> > 2. PassthroughJC + static partitions:
>>> >
>>> > - In this model, it is possible to make stateful processing (including
>>> > host affinity) work by carefully choosing how "id"s are assigned and
>>> > computed.
>>> >
>>> > Recommendation:
>>> >
>>> > - Owing to the above subtleties, I would recommend that we give the
>>> > ZkJobCoordinator + the built-in low-level Kafka integration a try.
>>> > - If we hit snags down this path, we can certainly explore the
>>> > approach with PassthroughJC + static partitions.
>>> > - Using the PassthroughJC + consumer-managed distribution would be
>>> > least preferable owing to the subtleties I outlined above.
>>> >
>>> > Please let us know should you have more questions.
>>> >
>>> > Best,
>>> > Jagdish
>>> >
>>> > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges <tstumpges@ntent.com
>>> > <mailto:tstumpges@ntent.com>> wrote:
>>> > Wow, what great timing, and what a great thread! I definitely have
>>> > some good starters to go off of here.
>>> >
>>> > If it is helpful for everyone, once I get the low-level API +
>>> > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate an
>>> > additional sample for hello-samza.
>>> >
>>> > One thing I'm still curious about, is what are the drawbacks or
>>> > complexities of leveraging the Kafka High-level consumer +
>>> > PassthroughJobCoordinator in a stand-alone setup like this? We do have
>>> > Zookeeper (because of kafka) so I think either would work. The Kafka
>>> > High-level consumer comes with other nice tools for monitoring
>>> > offsets, lag, etc....
>>> >
>>> > Thanks guys!
>>> > -Thunder
>>> >
>>> > -----Original Message-----
>>> > From: Tom Davis [mailto:tom@recursivedream.com<mailto:
>>> > tom@recursivedream.com>]
>>> > Sent: Wednesday, March 14, 2018 17:50
>>> > To: dev@samza.apache.org<mailto:dev@samza.apache.org>
>>> > Subject: Re: Old style "low level" Tasks with alternative deployment
>>> > model(s)
>>> >
>>> > Hey there!
>>> >
>>> > You are correct that this is focused on the higher-level API but
>>> > doesn't preclude using the lower-level API. I was at the same point
>>> > you were not long ago, in fact, and had a very productive conversation
>>> on the list:
>>> > you should look for "Question about custom StreamJob/Factory" in the
>>> > list archive for the past couple months.
>>> >
>>> > I'll quote Jagadish Venkatraman from that thread:
>>> >
>>> > > For the section on the low-level API, can you use
>>> > > LocalApplicationRunner#runTask()? It basically creates a new
>>> > > StreamProcessor and runs it. Remember to provide task.class and set
>>> > > it to your implementation of StreamTask or AsyncStreamTask. Please
>>> > > note that this is an evolving API and hence, subject to change.
>>> >
>>> > I ended up just switching to the high-level API because I don't have
>>> > any existing Tasks and the Kubernetes story is a little more straight
>>> > forward there (there's only one container/configuration to deploy).
>>> >
>>> > Best,
>>> >
>>> > Tom
>>> >
>>> > Thunder Stumpges <tstumpges@ntent.com<mailto:tstumpges@ntent.com>>
>>> writes:
>>> >
>>> > > Hi all,
>>> > >
>>> > > We are using Samza (0.12.0) in about 2 dozen jobs implementing
>>> > > several processing pipelines. We have also begun a significant move
>>> > > of other services within our company to Docker/Kubernetes. Right now
>>> > > our Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce"
>>> > > jobs
>>> > (many reporting and other batch processing jobs). We would really like
>>> > to move our stream processing off of Hadoop/Yarn and onto Kubernetes.
>>> > >
>>> > > When I just read about some of the new progress in .13 and .14 I got
>>> > > really excited! We would love to have our jobs run as simple
>>> > > libraries in our own JVM, and use the Kafka High-Level-Consumer for
>>> > > partition
>>> > distribution and such. This would let us "dockerfy" our application
>>> > and run/scale in kubernetes.
>>> > >
>>> > > However as I read it, this new deployment model is ONLY for the
>>> > > new(er) High Level API, correct? Is there a plan and/or resources
>>> > > for adapting this back to existing low-level tasks ? How complicated
>>> > > of a
>>> > task is that? Do I have any other options to make this transition
>>> easier?
>>> > >
>>> > > Thanks in advance.
>>> > > Thunder
>>> >
>>> >
>>> >
>>> > --
>>> > Jagadish V,
>>> > Graduate Student,
>>> > Department of Computer Science,
>>> > Stanford University
>>> >
>>> >
>>> >
>>> > --
>>> > Jagadish V,
>>> > Graduate Student,
>>> > Department of Computer Science,
>>> > Stanford University
>>> >
>>>
>>
>>


Mime
View raw message