samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tom Davis <...@recursivedream.com>
Subject Re: Old style "low level" Tasks with alternative deployment model(s)
Date Tue, 20 Mar 2018 23:04:03 GMT
Sounds good to me. I'll reply when I have a PR up and hand it off to you!

Thunder Stumpges <tstumpges@ntent.com> writes:

> I'm out this weekend, but would be glad to collaborate with Tom to get some documentation together. I'll let him take 
> the lead on it, and then maybe I can contribute my information on "running low-level api jobs in standalone mode" and 
> tips on kubernetes (which is really just like any other java application deployment once you get the job running 
> stand-alone)
>
> Thanks everyone, I have my job(s) running successfully in K8s at this time!
>
> -Thunder
>
> -----Original Message-----
> From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com]
> Sent: Tuesday, March 20, 2018 11:36
> To: Tom Davis <tom@recursivedream.com>
> Cc: Prateek Maheshwari <prateekmi2@gmail.com>; dev@samza.apache.org; Yipan@linkedin.com; Yi Pan <nickpan47@gmail.com>
> Subject: Re: Old style "low level" Tasks with alternative deployment model(s)
>
> Hi Tom,
>
>>> Happy to put something together this weekend as well.
>
> Great, can't wait!!
>
>>> What format would that be best in?
>
> You can open a PR in markdown format.
>
> Here's an example PR for Kinesis:
> https://github.com/apache/samza/pull/384/files/
> Here's how it looks and renders in our web-page:
> https://samza.apache.org/learn/documentation/0.14/aws/kinesis.html
>
> Best,
> Jagdish
>
> On Tue, Mar 20, 2018 at 11:24 AM, Tom Davis <tom@recursivedream.com> wrote:
>
>> What format would that be best in? Happy to put something together
>> this weekend as well.
>>
>> Jagadish Venkatraman <jagadish1989@gmail.com> writes:
>>
>> Hi Thunder,
>>>
>>> Thank you for the PR. Really nice work!
>>>
>>> Since, you have a working implementation on K8s, would you be willing
>>> to contribute a short tutorial / a post on this? We'll be sure to
>>> feature it in the official Samza web-site at http://samza.apache.org/.
>>>
>>> It'd be a great addition to the Samza community to have a section on
>>> K8s integration! There have been multiple prior asks on this, and
>>> your learnings would be super-helpful.
>>>
>>> Best,
>>> Jagdish
>>>
>>>
>>>
>>> On Tue, Mar 20, 2018 at 10:46 AM, Prateek Maheshwari <
>>> prateekmi2@gmail.com>
>>> wrote:
>>>
>>> Glad you were able to figure it out, that was very confusing. Thanks
>>> for
>>>> the fix too.
>>>>
>>>> - Prateek
>>>>
>>>> On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges
>>>> <tstumpges@ntent.com>
>>>> wrote:
>>>>
>>>> And that last issue was mine. My setting override was not picked up
>>>> and
>>>>> it was using GroupByContainerCount instead.
>>>>> -Thanks,
>>>>> Thunder
>>>>>
>>>>>
>>>>> -----Original Message-----
>>>>> From: Thunder Stumpges
>>>>> Sent: Monday, March 19, 2018 20:58
>>>>> To: dev@samza.apache.org
>>>>> Cc: Jagadish Venkatraman <jagadish1989@gmail.com>;
>>>>> tom@recursivedream.com; Yipan@linkedin.com; Yi Pan
>>>>> <nickpan47@gmail.com>
>>>>> Subject: RE: Old style "low level" Tasks with alternative
>>>>> deployment
>>>>> model(s)
>>>>>
>>>>> Well I figured it out. My specific issue was due to a simple
>>>>> dependency problem where I had gotten an older version of the
>>>>> Jackson-mapper library.
>>>>> However the code was throwing NoSuchMethodError (an Error instead
>>>>> of
>>>>> Exception) and being silently dropped. I created a pull request to
>>>>> handle any Throwable in ScheduleAfterDebounceTime.
>>>>> https://github.com/apache/samza/pull/450
>>>>>
>>>>> I'm now running into an issue with the generation of the JobModel
>>>>> and the ProcessorId. The ZkJobCoordinator has a ProcessorId that is
>>>>> a Guid, but when GroupByContainerIds class (my TaskNameGrouper)
>>>>> creates the ContainerModels, it is using the ContainerId (a numeric
>>>>> value,
>>>>> 0,1,2,etc)
>>>>> as the ProcessorId (~ line 105). This results in the JobModel that
>>>>> is generated and published immediately causing the processor to
>>>>> quit with this
>>>>> message:
>>>>>
>>>>> INFO  o.apache.samza.zk.ZkJobCoordinator - New JobModel does not
>>>>> contain pid=38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this
>>>>> processor.
>>>>>
>>>>> I was assuming I should be using GroupByContainerIds as my
>>>>> TaskNameGrouper. I don't see any other promising implementations.
>>>>> Am I just missing something?
>>>>>
>>>>> Thanks,
>>>>> Thunder
>>>>>
>>>>> JobModel
>>>>> {
>>>>>   "config" : {
>>>>>   ...
>>>>>   },
>>>>>   "containers" : {
>>>>>     "0" : {
>>>>>       "tasks" : {
>>>>>         "Partition 0" : {
>>>>>           "task-name" : "Partition 0",
>>>>>           "system-stream-partitions" : [ {
>>>>>             "system" : "kafka",
>>>>>             "partition" : 0,
>>>>>             "stream" : "test_topic1"
>>>>>           }, {
>>>>>             "system" : "kafka",
>>>>>             "partition" : 0,
>>>>>             "stream" : "test_topic2"
>>>>>           } ],
>>>>>           "changelog-partition" : 0
>>>>>         },
>>>>>         "Partition 1" : {
>>>>>           "task-name" : "Partition 1",
>>>>>           "system-stream-partitions" : [ {
>>>>>             "system" : "kafka",
>>>>>             "partition" : 1,
>>>>>             "stream" : "test_topic1"
>>>>>           }, {
>>>>>             "system" : "kafka",
>>>>>             "partition" : 1,
>>>>>             "stream" : "test_topic2"
>>>>>           } ],
>>>>>           "changelog-partition" : 1
>>>>>         }
>>>>>       },
>>>>>       "container-id" : 0,
>>>>>       "processor-id" : "0"
>>>>>     }
>>>>>   },
>>>>>   "max-change-log-stream-partitions" : 2,
>>>>>   "all-container-locality" : {
>>>>>     "0" : null
>>>>>   }
>>>>> }
>>>>>
>>>>> -----Original Message-----
>>>>> From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>>>> Sent: Friday, March 16, 2018 18:21
>>>>> To: dev@samza.apache.org
>>>>> Cc: Jagadish Venkatraman <jagadish1989@gmail.com>;
>>>>> tom@recursivedream.com; Yipan@linkedin.com; Yi Pan
>>>>> <nickpan47@gmail.com>
>>>>> Subject: RE: Old style "low level" Tasks with alternative
>>>>> deployment
>>>>> model(s)
>>>>>
>>>>> Attached. I don't see any threads actually running this code which
>>>>> is odd.
>>>>>
>>>>> There's my main thread that's waiting for the whole thing to
>>>>> finish, the "debounce-thread-0" (which logged the other surrounding
>>>>> messages below) has
>>>>> this:
>>>>>
>>>>> "debounce-thread-0" #18 daemon prio=5 os_prio=0
>>>>> tid=0x00007fa0fd719800
>>>>> nid=0x21 waiting on condition [0x00007fa0d0d45000]
>>>>>    java.lang.Thread.State: WAITING (parking)
>>>>>         at sun.misc.Unsafe.park(Native Method)
>>>>>         - parking to wait for  <0x00000006f166e350> (a
>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>>         at
>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java
>>>>> :175)
>>>>>         at
>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit
>>>>> ionObject.await(AbstractQueuedSynchronizer.java:2039)
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork
>>>>> Queue.take(ScheduledThreadPoolExecutor.java:1081)
>>>>>         at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork
>>>>> Queue.take(ScheduledThreadPoolExecutor.java:809)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolEx
>>>>> ecutor.java:1067)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>>> Executor.java:1127)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>>> lExecutor.java:617)
>>>>>
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>>    Locked ownable synchronizers:
>>>>>         - None
>>>>>
>>>>> Thanks for having a look.
>>>>> Thunder
>>>>>
>>>>>
>>>>> -----Original Message-----
>>>>> From: Prateek Maheshwari [mailto:prateekmi2@gmail.com]
>>>>> Sent: Friday, March 16, 2018 17:02
>>>>> To: dev@samza.apache.org
>>>>> Cc: Jagadish Venkatraman <jagadish1989@gmail.com>;
>>>>> tom@recursivedream.com; Yipan@linkedin.com; Yi Pan
>>>>> <nickpan47@gmail.com>
>>>>> Subject: Re: Old style "low level" Tasks with alternative
>>>>> deployment
>>>>> model(s)
>>>>>
>>>>> Hi Thunder,
>>>>>
>>>>> Can you please take and attach a thread dump with this?
>>>>>
>>>>> Thanks,
>>>>> Prateek
>>>>>
>>>>> On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges
>>>>> <tstumpges@ntent.com>
>>>>> wrote:
>>>>>
>>>>> > It appears it IS hung while serializing the JobModel... very strange!
>>>>> > I added some debug statements around the calls:
>>>>> >
>>>>> >       LOG.debug("Getting object mapper to serialize job model");
>>>>> > // this IS printed
>>>>> >       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>>>>> >       LOG.debug("Serializing job model"); // this IS printed
>>>>> >       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
>>>>> > ().writeValueAsString(jobModel);
>>>>> >       LOG.info("jobModelAsString=" + jobModelStr); // this is NOT
>>>>> printed!
>>>>> >
>>>>> > Another thing I noticed is that "getObjectMapper" actually
>>>>> > creates the object mapper twice!
>>>>> >
>>>>> > 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG
>>>>> > org.apache.samza.zk.ZkUtils - Getting object mapper to serialize
>>>>> > job model
>>>>> > 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG
>>>>> > o.a.s.s.model.SamzaObjectMapper
>>>>> > - Creating new object mapper and simple module
>>>>> > 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG
>>>>> > o.a.s.s.model.SamzaObjectMapper
>>>>> > - Adding SerDes and mixins
>>>>> > 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG
>>>>> > o.a.s.s.model.SamzaObjectMapper
>>>>> > - Adding custom ContainerModel deserializer
>>>>> > 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG
>>>>> > o.a.s.s.model.SamzaObjectMapper
>>>>> > - Setting up naming strategy and registering module
>>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>>>> > o.a.s.s.model.SamzaObjectMapper
>>>>> > - Done!
>>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>>>> > o.a.s.s.model.SamzaObjectMapper
>>>>> > - Creating new object mapper and simple module
>>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>>>> > o.a.s.s.model.SamzaObjectMapper
>>>>> > - Adding SerDes  and mixins
>>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>>>> > o.a.s.s.model.SamzaObjectMapper
>>>>> > - Adding custom ContainerModel deserializer
>>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>>>> > o.a.s.s.model.SamzaObjectMapper
>>>>> > - Setting up naming strategy and registering module
>>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>>>> > o.a.s.s.model.SamzaObjectMapper
>>>>> > - Done!
>>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
>>>>> > org.apache.samza.zk.ZkUtils - Serializing job model
>>>>> >
>>>>> > Could this ObjectMapper be a singleton? I see there is a private
>>>>> > static instance, but getObjectMapper creates a new one every time...
>>>>> >
>>>>> > Anyway, then it takes off to serialize the job model and never
>>>>> > comes back...
>>>>> >
>>>>> > Hoping someone has some idea here... the implementation for this
>>>>> > mostly comes from Jackson-mapper-asl, and I have the version that
>>>>> > is linked in the
>>>>> > 0.14.0 tag:
>>>>> > |    |    |    +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
>>>>> > |    |    |    |    \--- org.codehaus.jackson:jackson-core-asl:1.9.13
>>>>> >
>>>>> > Thanks!
>>>>> > Thunder
>>>>> >
>>>>> > -----Original Message-----
>>>>> > From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>>>> > Sent: Friday, March 16, 2018 15:29
>>>>> > To: dev@samza.apache.org; Jagadish Venkatraman
>>>>> > <jagadish1989@gmail.com>
>>>>> > Cc: tom@recursivedream.com; Yipan@linkedin.com; Yi Pan <
>>>>> > nickpan47@gmail.com>
>>>>> > Subject: RE: Old style "low level" Tasks with alternative
>>>>> > deployment
>>>>> > model(s)
>>>>> >
>>>>> > So, my investigation starts at StreamProcessor.java.  Line 294 in
>>>>> > method
>>>>> > onNewJobModel() logs an INFO message that it's starting a container.
>>>>> > This message never appears.
>>>>> >
>>>>> > I see that ZkJobCoordinator calls onNewJobModel from its
>>>>> > onNewJobModelConfirmed method which also logs an info message
>>>>> > stating "version X of the job model got confirmed". I never see
>>>>> > this message either, so I go up the chain some more.
>>>>> >
>>>>> > I DO see:
>>>>> >
>>>>> > 2018-03-16 21:43:58 logback 20498
>>>>> > [ZkClient-EventThread-13-10.0.127.114:2181]
>>>>> > INFO  o.apache.samza.zk.ZkJobCoordinator -
>>>>> > ZkJobCoordinator::onBecomeLeader
>>>>> > - I became the leader!
>>>>> > And
>>>>> > 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO
>>>>> > o.apache.samza.zk.ZkJobCoordinator -
>>>>> > pid=91e07d20-ae33-4156-a5f3-534a95642133Generated
>>>>> > new Job Model. Version = 1
>>>>> >
>>>>> > Which led me to method onDoProcessorChange line 210. I see that
>>>>> > line, but not the line below " Published new Job Model. Version
>>>>> > =" so something in here is not completing:
>>>>> >
>>>>> >     LOG.info("pid=" + processorId + "Generated new Job Model.
>>>>> > Version
>>>>> =
>>>>> "
>>>>> > + nextJMVersion);
>>>>> >
>>>>> >     // Publish the new job model
>>>>> >     zkUtils.publishJobModel(nextJMVersion, jobModel);
>>>>> >
>>>>> >     // Start the barrier for the job model update
>>>>> >     barrier.create(nextJMVersion, currentProcessorIds);
>>>>> >
>>>>> >     // Notify all processors about the new JobModel by updating
>>>>> > JobModel Version number
>>>>> >     zkUtils.publishJobModelVersion(currentJMVersion,
>>>>> > nextJMVersion);
>>>>> >
>>>>> >     LOG.info("pid=" + processorId + "Published new Job Model.
>>>>> > Version
>>>>> =
>>>>> "
>>>>> > + nextJMVersion);
>>>>> >
>>>>> > As I mentioned, after the line "Generated new Job Model. Version = 1"
>>>>> > I just get repeated zk ping responses.. no more application logging.
>>>>> >
>>>>> > The very next thing that's run is zkUtils.publishJobModel() which
>>>>> > only has two lines before another log statement (which I don't see):
>>>>> >
>>>>> >   public void publishJobModel(String jobModelVersion, JobModel
>>>>> jobModel) {
>>>>> >     try {
>>>>> >       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>>>>> >       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
>>>>> > ().writeValueAsString(jobModel);
>>>>> >       LOG.info("jobModelAsString=" + jobModelStr);
>>>>> >       ...
>>>>> >
>>>>> > Could it really be getting hung up on one of these two lines?
>>>>> > (seems like it must be, but I don't see anything there that seems
>>>>> > like it would just hang). I'll keep troubleshooting, maybe add
>>>>> > some more debug logging and try again.
>>>>> >
>>>>> > Thanks for any guidance you all might have.
>>>>> > -Thunder
>>>>> >
>>>>> >
>>>>> > -----Original Message-----
>>>>> > From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>>>> > Sent: Friday, March 16, 2018 14:43
>>>>> > To: dev@samza.apache.org; Jagadish Venkatraman
>>>>> > <jagadish1989@gmail.com>
>>>>> > Cc: tom@recursivedream.com; Yipan@linkedin.com; Yi Pan <
>>>>> > nickpan47@gmail.com>
>>>>> > Subject: RE: Old style "low level" Tasks with alternative
>>>>> > deployment
>>>>> > model(s)
>>>>> >
>>>>> > Well I have my stand-alone application in docker and running in
>>>>> > kubernetes. I think something isn't wired up all the way though,
>>>>> > because my task never actually gets invoked. I see no errors,
>>>>> > however I'm not getting the usual startup logs (checking existing
>>>>> > offsets, "entering run loop"...) My logs look like this:
>>>>> >
>>>>> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
>>>>> > kafka.utils.VerifiableProperties
>>>>> > - Verifying properties
>>>>> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
>>>>> > kafka.utils.VerifiableProperties
>>>>> > - Property client.id is overridden to
>>>>> > samza_admin-test_stream_task-1
>>>>> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
>>>>> > kafka.utils.VerifiableProperties
>>>>> > - Property metadata.broker.list is overridden to
>>>>> > test-kafka-kafka.test-svc:9092
>>>>> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
>>>>> > kafka.utils.VerifiableProperties
>>>>> > - Property request.timeout.ms is overridden to 30000
>>>>> > 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO
>>>>> > kafka.client.ClientUtils$ - Fetching metadata from broker
>>>>> > BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation
>>>>> > id
>>>>> 0
>>>>> > for 1 topic(s) Set(dev_k8s.samza.test.topic)
>>>>> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG
>>>>> > kafka.network.BlockingChannel - Created socket with SO_TIMEOUT =
>>>>> > 30000 (requested 30000), SO_RCVBUF = 179680 (requested -1),
>>>>> > SO_SNDBUF =
>>>>> > 102400 (requested 102400), connectTimeoutMs = 30000.
>>>>> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO
>>>>> > kafka.producer.SyncProducer - Connected to
>>>>> > test-kafka-kafka.test-svc:9092 for producing
>>>>> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO
>>>>> > kafka.producer.SyncProducer - Disconnecting from
>>>>> > test-kafka-kafka.test-svc:9092
>>>>> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG
>>>>> > kafka.client.ClientUtils$ - Successfully fetched metadata for 1
>>>>> > topic(s)
>>>>> > Set(dev_k8s.samza.test.topic)
>>>>> > 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper
>>>>> > org.apache.samza.container.grouper.stream.GroupByPartition@1a7158
>>>>> > cc has grouped the SystemStreamPartitions into 10 tasks with the
>>>>> > following
>>>>> > taskNames: [Partition 1, Partition 0, Partition 3, Partition 2,
>>>>> > Partition 5, Partition 4, Partition 7, Partition 6, Partition 9,
>>>>> > Partition 8]
>>>>> > 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 0 is
>>>>> > being assigned changelog partition 0.
>>>>> > 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 1 is
>>>>> > being assigned changelog partition 1.
>>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 2 is
>>>>> > being assigned changelog partition 2.
>>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 3 is
>>>>> > being assigned changelog partition 3.
>>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 4 is
>>>>> > being assigned changelog partition 4.
>>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 5 is
>>>>> > being assigned changelog partition 5.
>>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 6 is
>>>>> > being assigned changelog partition 6.
>>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 7 is
>>>>> > being assigned changelog partition 7.
>>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 8 is
>>>>> > being assigned changelog partition 8.
>>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
>>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 9 is
>>>>> > being assigned changelog partition 9.
>>>>> > 2018-03-16 21:05:55 logback 50838
>>>>> > [main-SendThread(10.0.127.114:2181
>>>>> )]
>>>>> > DEBUG org.apache.zookeeper.ClientCnxn - Reading reply
>>>>> > sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null
>>>>> > serverPath:null finished:false header:: 23,4  replyHeader:: 23,14024,0  request::
>>>>> > '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/
>>>>> > JobModelGeneration/jobModelVersion,T  response::
>>>>> > ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878}
>>>>> > 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO
>>>>> > o.apache.samza.zk.ZkJobCoordinator -
>>>>> > pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated
>>>>> > new Job Model. Version = 1
>>>>> > 2018-03-16 21:06:05 logback 60848
>>>>> > [main-SendThread(10.0.127.114:2181
>>>>> )]
>>>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>>>>> sessionid:
>>>>> > 0x1622c8b5fc01ac7 after 2ms
>>>>> > 2018-03-16 21:06:15 logback 70856
>>>>> > [main-SendThread(10.0.127.114:2181
>>>>> )]
>>>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>>>>> sessionid:
>>>>> > 0x1622c8b5fc01ac7 after 1ms
>>>>> > 2018-03-16 21:06:25 logback 80865
>>>>> > [main-SendThread(10.0.127.114:2181
>>>>> )]
>>>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>>>>> sessionid:
>>>>> > 0x1622c8b5fc01ac7 after 2ms ...
>>>>> >
>>>>> > The zk ping responses continue every 10 seconds, but no other
>>>>> > activity or messages occur.
>>>>> > It looks like it gets as far as confirming the JobModel and
>>>>> > grouping the partitions, but nothing actually starts up.
>>>>> >
>>>>> > Any ideas?
>>>>> > Thanks in advance!
>>>>> > Thunder
>>>>> >
>>>>> >
>>>>> > -----Original Message-----
>>>>> > From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>>>> > Sent: Thursday, March 15, 2018 16:35
>>>>> > To: Jagadish Venkatraman <jagadish1989@gmail.com>
>>>>> > Cc: dev@samza.apache.org; tom@recursivedream.com;
>>>>> > Yipan@linkedin.com; Yi Pan <nickpan47@gmail.com>
>>>>> > Subject: RE: Old style "low level" Tasks with alternative
>>>>> > deployment
>>>>> > model(s)
>>>>> >
>>>>> > Thanks a lot for the info. I have something basically working at
>>>>> > this point! I have not integrated it with Docker nor Kubernetes
>>>>> > yet, but it does run from my local machine.
>>>>> >
>>>>> > I have determined that LocalApplicationRunner does NOT do config
>>>>> > rewriting. I had to write my own little “StandAloneApplicationRunner”
>>>>> > that handles the “main” entrypoint. It does command parsing using
>>>>> > CommandLine, load config from ConfigFactory, and perform
>>>>> > rewriting before creating the new instance of
>>>>> > LocalApplicationRunner. This is all my StandAloneApplicationRunner contains:
>>>>> >
>>>>> >
>>>>> > object StandAloneSamzaRunner extends App with LazyLogging {
>>>>> >
>>>>> >   // parse command line args just like JobRunner.
>>>>> >   val cmdline = new ApplicationRunnerCommandLine
>>>>> >   val options = cmdline.parser.parse(args: _*)
>>>>> >   val config = cmdline.loadConfig(options)
>>>>> >
>>>>> >   val runner = new LocalApplicationRunner(Util.rewriteConfig(config))
>>>>> >   runner.runTask()
>>>>> >   runner.waitForFinish()
>>>>> > }
>>>>> >
>>>>> > The only config settings I needed to make to use this runner were
>>>>> > (easily configured due to our central Consul config system and
>>>>> > our
>>>>> rewriter) :
>>>>> >
>>>>> > # use the ZK based job coordinator
>>>>> > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFacto
>>>>> > ry # need to use GroupByContainerIds instead of
>>>>> > GroupByContainerCount
>>>>> > task.name.grouper.factory=org.apache.samza.container.grouper.task.
>>>>> > GroupByContainerIdsFactory
>>>>> > # ZKJC config
>>>>> > job.coordinator.zk.connect=<our_zk_connection>
>>>>> >
>>>>> > I did run into one potential problem; as you see above, I have
>>>>> > started the task using runTask() and then to prevent my main
>>>>> > method from returning, I have called waitForFinish(). The first
>>>>> > time I ran it, the job itself failed because I had forgotten to
>>>>> > override the task grouper, and container count was pulled from our staging environment.
>>>>> > There are some failures logged and it appears the JobCoordinator
>>>>> > fails, but it never returns from waitForFinish. Stack trace and
>>>>> continuation of log is below:
>>>>> >
>>>>> > 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR
>>>>> > o.a.s.zk.ScheduleAfterDebounceTime
>>>>> > - Execution of action: OnProcessorChange failed.
>>>>> > java.lang.IllegalArgumentException: Your container count (4) is
>>>>> larger
>>>>> > than your task count (2). Can't have containers with nothing to
>>>>> > do, so aborting.
>>>>> >        at org.apache.samza.container.gro
>>>>> uper.task.GroupByContainerCoun
>>>>> t.
>>>>> > validateTasks(GroupByContainerCount.java:212)
>>>>> >        at org.apache.samza.container.grouper.task.
>>>>> > GroupByContainerCount.group(GroupByContainerCount.java:62)
>>>>> >        at org.apache.samza.container.grouper.task.TaskNameGrouper.
>>>>> > group(TaskNameGrouper.java:56)
>>>>> >        at
>>>>> > org.apache.samza.coordinator.JobModelManager$.readJobModel(
>>>>> > JobModelManager.scala:266)
>>>>> >        at
>>>>> > org.apache.samza.coordinator.JobModelManager.readJobModel(
>>>>> > JobModelManager.scala)
>>>>> >        at
>>>>> > org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel(
>>>>> > ZkJobCoordinator.java:306)
>>>>> >        at
>>>>> > org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(
>>>>> > ZkJobCoordinator.java:197)
>>>>> >        at org.apache.samza.zk.ZkJobCoord
>>>>> inator$LeaderElectorListenerIm
>>>>> pl.
>>>>> > lambda$onBecomingLeader$0(ZkJobCoordinator.java:318)
>>>>> >        at org.apache.samza.zk.ScheduleAfterDebounceTime.
>>>>> > lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134)
>>>>> >        at java.util.concurrent.Executors$RunnableAdapter.
>>>>> > call$$$capture(Executors.java:511)
>>>>> >        at java.util.concurrent.Executors$RunnableAdapter.
>>>>> > call(Executors.java)
>>>>> >        at java.util.concurrent.FutureTask.run$$$capture(
>>>>> > FutureTask.java:266)
>>>>> >        at java.util.concurrent.FutureTask.run(FutureTask.java)
>>>>> >        at java.util.concurrent.ScheduledThreadPoolExecutor$
>>>>> > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>> >        at java.util.concurrent.ScheduledThreadPoolExecutor$
>>>>> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>> >        at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>> > ThreadPoolExecutor.java:1142)
>>>>> >        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>> > ThreadPoolExecutor.java:617)
>>>>> >        at java.lang.Thread.run(Thread.java:745)
>>>>> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
>>>>> > o.a.samza.processor.StreamProcessor - Container is not
>>>>> > instantiated
>>>>> yet.
>>>>> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
>>>>> > org.I0Itec.zkclient.ZkClient - Closing ZkClient...
>>>>> > 2018-03-15 22:34:32 logback 77789
>>>>> > [ZkClient-EventThread-15-10.0.127.114:2181]
>>>>> > INFO  org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient
>>>>> > event
>>>>> thread.
>>>>> >
>>>>> > And then the application continues on with metric reporters, and
>>>>> > other debug logging (not actually running the task though)
>>>>> >
>>>>> > Thanks in advance for the guidance, this has been easier than I
>>>>> imagined!
>>>>> > I’ll report back when I get more of the Dockerization/Kubernetes
>>>>> > running and test it a bit more.
>>>>> > Cheers,
>>>>> > Thunder
>>>>> >
>>>>> >
>>>>> > From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com]
>>>>> > Sent: Thursday, March 15, 2018 14:46
>>>>> > To: Thunder Stumpges <tstumpges@ntent.com>
>>>>> > Cc: dev@samza.apache.org; tom@recursivedream.com;
>>>>> > Yipan@linkedin.com; Yi Pan <nickpan47@gmail.com>
>>>>> > Subject: Re: Old style "low level" Tasks with alternative
>>>>> > deployment
>>>>> > model(s)
>>>>> >
>>>>> > >>  Thanks for the info on the tradeoffs. That makes a lot of
>>>>> > >> sense. I am
>>>>> > on-board with using ZkJobCoordinator, sounds like some good
>>>>> > benefits over just the Kafka high-level consumer.
>>>>> >
>>>>> > This certainly looks like the simplest alternative.
>>>>> >
>>>>> > For your other questions, please find my answers inline.
>>>>> >
>>>>> > >> Q1: If I use LocalApplicationRunner, It does not use
>>>>> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>>>>> >
>>>>> > Your understanding is correct. It directly instantiates the
>>>>> > StreamProcessor, which in-turn creates and runs the SamzaContainer.
>>>>> >
>>>>> > >> Q2: If I use LocalApplicationRunner, I will need to code
>>>>> > >> myself the
>>>>> > loading and rewriting of the Config that is currently handled by
>>>>> > JobRunner, correct?
>>>>> >
>>>>> > I don't think you'll need to do this. IIUC, the
>>>>> > LocalApplicationRunner should automatically invoke rewriters and do the right thing.
>>>>> >
>>>>> > >>  Q3: Do I need to also handle coordinator stream(s) and
>>>>> > >> storing of
>>>>> > config that is done in JobRunner (I don’t think so as the ?
>>>>> >
>>>>> > I don't think this is necessary either. The creation of
>>>>> > coordinator stream and persisting configuration happens in the
>>>>> > LocalApplicationRunner (more specifically in
>>>>> StreamManager#createStreams).
>>>>> >
>>>>> > >> Q4: Where/How do I specify the Container ID for each instance?
>>>>> > >> Is there
>>>>> > a config setting that I can pass, (or pull from an env variable
>>>>> > and add to the config) ? I am assuming it is my responsibility to
>>>>> > ensure that each instance is started with a unique container ID..?
>>>>> >
>>>>> > Nope, If you are using the ZkJobCoordinator, you need not have to
>>>>> > worry about assigning IDs for each instance. The framework will
>>>>> > automatically take care of generating IDs and reaching consensus
>>>>> > by electing a leader. If you are curious please take a look at
>>>>> > implementations of the ProcessorIdGenerator interface.
>>>>> >
>>>>> > Please let us know should you have further questions!
>>>>> >
>>>>> > Best,
>>>>> > Jagdish
>>>>> >
>>>>> > On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges
>>>>> > <tstumpges@ntent.com <mailto:tstumpges@ntent.com>> wrote:
>>>>> >
>>>>> > Thanks for the info on the tradeoffs. That makes a lot of sense.
>>>>> > I am on-board with using ZkJobCoordinator, sounds like some good
>>>>> > benefits over just the Kafka high-level consumer.
>>>>> >
>>>>> >
>>>>> >
>>>>> > To that end, I have made some notes on possible approaches based
>>>>> > on the previous thread, and from my look into the code. I’d love
>>>>> > to get
>>>>> feedback.
>>>>> >
>>>>> >
>>>>> >
>>>>> > Approach 1. Configure jobs to use “ProcessJobFactory” and run
>>>>> > instances of the job using run-job.sh or using JobRunner directly.
>>>>> >
>>>>> > I don’t think this makes sense from what I can see for a few reasons:
>>>>> >
>>>>> >   *   JobRunner is concerned with stuff I don't *think* we need:
>>>>> >
>>>>> >      *   coordinatorSystemProducer|Consumer,
>>>>> >      *   writing/reading the configuration to the coordinator streams
>>>>> >
>>>>> >   *   ProcessJobFactory hard-codes the ID to “0” so I don’t think that
>>>>> > will work for multiple instances.
>>>>> >
>>>>> >
>>>>> >
>>>>> > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and
>>>>> > invoke
>>>>> > LocalApplicationRunner.runTask()
>>>>> >
>>>>> >
>>>>> >
>>>>> >     Q1: If I use LocalApplicationRunner, It does not use
>>>>> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>>>>> >
>>>>> >     Q2: If I use LocalApplicationRunner, I will need to code
>>>>> > myself the loading and rewriting of the Config that is currently
>>>>> > handled by JobRunner, correct?
>>>>> >
>>>>> >     Q3: Do I need to also handle coordinator stream(s) and
>>>>> > storing of config that is done in JobRunner (I don’t think so as the ?
>>>>> >
>>>>> >     Q4: Where/How do I specify the Container ID for each
>>>>> > instance? Is there a config setting that I can pass, (or pull
>>>>> > from an env variable and add to the config) ? I am assuming it is
>>>>> > my responsibility to ensure that each instance is started with a unique container ID..?
>>>>> >
>>>>> > I am getting started on the above (Approach 2.), and looking
>>>>> > closer at the code so I may have my own answers to my questions,
>>>>> > but figured I should go ahead and ask now anyway. Thanks!
>>>>> >
>>>>> > -Thunder
>>>>> >
>>>>> >
>>>>> > From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com<mailto:
>>>>> > jagadish1989@gmail.com>]
>>>>> > Sent: Thursday, March 15, 2018 1:41
>>>>> > To: dev@samza.apache.org<mailto:dev@samza.apache.org>; Thunder
>>>>> > Stumpges < tstumpges@ntent.com<mailto:tstumpges@ntent.com>>;
>>>>> > tom@recursivedream.com <mailto:tom@recursivedream.com>
>>>>> > Cc: Yipan@linkedin.com<mailto:Yipan@linkedin.com>; Yi Pan <
>>>>> > nickpan47@gmail.com<mailto:nickpan47@gmail.com>>
>>>>> >
>>>>> > Subject: Re: Old style "low level" Tasks with alternative
>>>>> > deployment
>>>>> > model(s)
>>>>> >
>>>>> > >> You are correct that this is focused on the higher-level API
>>>>> > >> but doesn't
>>>>> > preclude using the lower-level API. I was at the same point you
>>>>> > were not long ago, in fact, and had a very productive
>>>>> > conversation on the list
>>>>> >
>>>>> > Thanks Tom for linking the thread, and I'm glad that you were
>>>>> > able to get Kubernetes integration working with Samza.
>>>>> >
>>>>> > >> If it is helpful for everyone, once I get the low-level API +
>>>>> > >> ZkJobCoordinator + Docker +
>>>>> > K8s working, I'd be glad to formulate an additional sample for
>>>>> hello-samza.
>>>>> >
>>>>> > @Thunder Stumpges:
>>>>> > We'd be thrilled to receive your contribution. Examples, demos,
>>>>> > tutorials etc.
>>>>> > contribute a great deal to improving the ease of use of Apache Samza.
>>>>> > I'm happy to shepherd design discussions/code-reviews in the
>>>>> > open-source including answering any questions you may have.
>>>>> >
>>>>> >
>>>>> > >> One thing I'm still curious about, is what are the drawbacks
>>>>> > >> or complexities of leveraging the Kafka High-level consumer +
>>>>> > >> PassthroughJobCoordinator in a stand-alone setup like this? We
>>>>> > >> do have Zookeeper (because of kafka) so I think either would
>>>>> > >> work. The Kafka High-level consumer comes with other nice
>>>>> > >> tools for monitoring offsets, lag, etc
>>>>> >
>>>>> >
>>>>> > @Thunder Stumpges:
>>>>> >
>>>>> > Samza uses a "Job-Coordinator" to assign your input-partitions
>>>>> > among the different instances of your application s.t. they don't
>>>>> > overlap. A typical way to solve this "partition distribution"
>>>>> > problem is to have a single instance elected as a "leader" and
>>>>> > have the leader assign partitions to the group.
>>>>> > The ZkJobCoordinator uses Zk primitives to achieve this, while
>>>>> > the YarnJC relies on Yarn's guarantee that there will be a
>>>>> > singleton-AppMaster to achieve this.
>>>>> >
>>>>> > A key difference that separates the PassthroughJC from the
>>>>> > Yarn/Zk variants is that it does _not_ attempt to solve the
>>>>> > "partition distribution" problem. As a result, there's no
>>>>> > leader-election
>>>>> involved.
>>>>> > Instead, it pushes the problem of "partition distribution" to the
>>>>> > underlying consumer.
>>>>> >
>>>>> > The PassThroughJc supports these 2 scenarios:
>>>>> >
>>>>> > 1. Consumer-managed partition distribution: When using the Kafka
>>>>> > high-level consumer (or an AWS KinesisClientLibrary consumer)
>>>>> > with Samza, the consumer manages partitions internally.
>>>>> >
>>>>> > 2. Static partition distribution: Alternately, partitions can be
>>>>> > managed statically using configuration. You can achieve static
>>>>> > partition assignment by implementing a custom
>>>>> > SystemStreamPartitionGrouper<h
>>>>> > ttps://samza.apache.org/learn/documentation/0.8/api/
>>>>> > javadocs/org/apache/samza/container/grouper/stream/
>>>>> > SystemStreamPartitionGrouper.html> and TaskNameGrouper<https://
>>>>> > github.com/apache/samza/blob/master/samza-core/src/main/
>>>>> > java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>.
>>>>> > Solutions in this category will typically require you to
>>>>> > distinguish the various processors in the group by providing an "id" for each.
>>>>> > Once the "id"s are decided, you can then statically compute
>>>>> > assignments using a function (eg: modulo N).
>>>>> > You can rely on the following mechanisms to provide this id:
>>>>> >  - Configure each instance differently to have its own id
>>>>> >  - Obtain the id from the cluster-manager. For instance,
>>>>> > Kubernetes will provide each POD an unique id in the range [0,N).
>>>>> > AWS ECS should expose similar capabilities via a REST end-point.
>>>>> >
>>>>> > >> One thing I'm still curious about, is what are the drawbacks
>>>>> > >> or
>>>>> > complexities of leveraging the Kafka High-level consumer +
>>>>> > PassthroughJobCoordinator in a stand-alone setup like this?
>>>>> >
>>>>> > Leveraging the Kafka High-level consumer:
>>>>> >
>>>>> > The Kafka high-level consumer is not integrated into Samza just yet.
>>>>> > Instead, Samza's integration with Kafka uses the low-level
>>>>> > consumer because
>>>>> > i) It allows for greater control in fetching data from individual
>>>>> brokers.
>>>>> > It is simple and performant in-terms of the threading model to
>>>>> > have one-thread pull from each broker.
>>>>> > ii) It is efficient in memory utilization since it does not do
>>>>> > internal-buffering of messages.
>>>>> > iii) There's no overhead like Kafka-controller heart-beats that
>>>>> > are driven by consumer.poll
>>>>> >
>>>>> > Since there's no built-in integration, you will have to build a
>>>>> > new SystemConsumer if you need to integrate with the Kafka
>>>>> > High-level
>>>>> consumer.
>>>>> > Further, there's more a fair bit of complexity to manage in
>>>>> checkpointing.
>>>>> >
>>>>> > >> The Kafka High-level consumer comes with other nice tools for
>>>>> > >> monitoring offsets, lag, etc
>>>>> >
>>>>> > Samza exposes<https://github.com/apache/samza/blob/master/
>>>>> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/
>>>>> > KafkaSystemConsumerMetrics.scala> the below metrics for
>>>>> lag-monitoring:
>>>>> > - The current log-end offset for each partition
>>>>> > - The last check-pointed offset for each partition
>>>>> > - The number of messages behind the highwatermark of the
>>>>> > partition
>>>>> >
>>>>> > Please let us know if you need help discovering these or
>>>>> > integrating these with other systems/tools.
>>>>> >
>>>>> >
>>>>> > Leveraging the Passthrough JobCoordinator:
>>>>> >
>>>>> > It's helpful to split this discussion on tradeoffs with
>>>>> > PassthroughJC into
>>>>> > 2 parts:
>>>>> >
>>>>> > 1. PassthroughJC + consumer managed partitions:
>>>>> >
>>>>> > - In this model, Samza has no control over partition-assignment
>>>>> > since it's managed by the consumer. This means that stateful
>>>>> > operations like joins that rely on partitions being co-located on
>>>>> > the same task will
>>>>> not work.
>>>>> > Simple stateless operations (eg: map, filter, remote lookups) are
>>>>> fine.
>>>>> >
>>>>> > - A key differentiator between Samza and other frameworks is our
>>>>> > support for "host
>>>>> > affinity<https://samza.apache.org/learn/documentation/0.14/
>>>>> > yarn/yarn-host-affinity.html>". Samza achieves this by assigning
>>>>> > partitions to hosts taking data-locality into account. If the
>>>>> > consumer can arbitrarily shuffle partitions, it'd be hard to
>>>>> > support this affinity/locality. Often this is a key optimization
>>>>> > when dealing with large stateful jobs.
>>>>> >
>>>>> > 2. PassthroughJC + static partitions:
>>>>> >
>>>>> > - In this model, it is possible to make stateful processing
>>>>> > (including host affinity) work by carefully choosing how "id"s
>>>>> > are assigned and computed.
>>>>> >
>>>>> > Recommendation:
>>>>> >
>>>>> > - Owing to the above subtleties, I would recommend that we give
>>>>> > the ZkJobCoordinator + the built-in low-level Kafka integration a try.
>>>>> > - If we hit snags down this path, we can certainly explore the
>>>>> > approach with PassthroughJC + static partitions.
>>>>> > - Using the PassthroughJC + consumer-managed distribution would
>>>>> > be least preferable owing to the subtleties I outlined above.
>>>>> >
>>>>> > Please let us know should you have more questions.
>>>>> >
>>>>> > Best,
>>>>> > Jagdish
>>>>> >
>>>>> > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges <
>>>>> tstumpges@ntent.com
>>>>> > <mailto:tstumpges@ntent.com>> wrote:
>>>>> > Wow, what great timing, and what a great thread! I definitely
>>>>> > have some good starters to go off of here.
>>>>> >
>>>>> > If it is helpful for everyone, once I get the low-level API +
>>>>> > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate
>>>>> > an additional sample for hello-samza.
>>>>> >
>>>>> > One thing I'm still curious about, is what are the drawbacks or
>>>>> > complexities of leveraging the Kafka High-level consumer +
>>>>> > PassthroughJobCoordinator in a stand-alone setup like this? We do
>>>>> > have Zookeeper (because of kafka) so I think either would work.
>>>>> > The Kafka High-level consumer comes with other nice tools for
>>>>> > monitoring offsets, lag, etc....
>>>>> >
>>>>> > Thanks guys!
>>>>> > -Thunder
>>>>> >
>>>>> > -----Original Message-----
>>>>> > From: Tom Davis [mailto:tom@recursivedream.com<mailto:
>>>>> > tom@recursivedream.com>]
>>>>> > Sent: Wednesday, March 14, 2018 17:50
>>>>> > To: dev@samza.apache.org<mailto:dev@samza.apache.org>
>>>>> > Subject: Re: Old style "low level" Tasks with alternative
>>>>> > deployment
>>>>> > model(s)
>>>>> >
>>>>> > Hey there!
>>>>> >
>>>>> > You are correct that this is focused on the higher-level API but
>>>>> > doesn't preclude using the lower-level API. I was at the same
>>>>> > point you were not long ago, in fact, and had a very productive
>>>>> > conversation
>>>>> on the list:
>>>>> > you should look for "Question about custom StreamJob/Factory" in
>>>>> > the list archive for the past couple months.
>>>>> >
>>>>> > I'll quote Jagadish Venkatraman from that thread:
>>>>> >
>>>>> > > For the section on the low-level API, can you use
>>>>> > > LocalApplicationRunner#runTask()? It basically creates a new
>>>>> > > StreamProcessor and runs it. Remember to provide task.class and
>>>>> > > set it to your implementation of StreamTask or AsyncStreamTask.
>>>>> > > Please note that this is an evolving API and hence, subject to change.
>>>>> >
>>>>> > I ended up just switching to the high-level API because I don't
>>>>> > have any existing Tasks and the Kubernetes story is a little more
>>>>> > straight forward there (there's only one container/configuration to deploy).
>>>>> >
>>>>> > Best,
>>>>> >
>>>>> > Tom
>>>>> >
>>>>> > Thunder Stumpges
>>>>> > <tstumpges@ntent.com<mailto:tstumpges@ntent.com>>
>>>>> writes:
>>>>> >
>>>>> > > Hi all,
>>>>> > >
>>>>> > > We are using Samza (0.12.0) in about 2 dozen jobs implementing
>>>>> > > several processing pipelines. We have also begun a significant
>>>>> > > move of other services within our company to Docker/Kubernetes.
>>>>> > > Right now our Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce"
>>>>> > > jobs
>>>>> > (many reporting and other batch processing jobs). We would really
>>>>> > like to move our stream processing off of Hadoop/Yarn and onto Kubernetes.
>>>>> > >
>>>>> > > When I just read about some of the new progress in .13 and .14
>>>>> > > I got really excited! We would love to have our jobs run as
>>>>> > > simple libraries in our own JVM, and use the Kafka
>>>>> > > High-Level-Consumer for partition
>>>>> > distribution and such. This would let us "dockerfy" our
>>>>> > application and run/scale in kubernetes.
>>>>> > >
>>>>> > > However as I read it, this new deployment model is ONLY for the
>>>>> > > new(er) High Level API, correct? Is there a plan and/or
>>>>> > > resources for adapting this back to existing low-level tasks ?
>>>>> > > How complicated of a
>>>>> > task is that? Do I have any other options to make this transition
>>>>> easier?
>>>>> > >
>>>>> > > Thanks in advance.
>>>>> > > Thunder
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Jagadish V,
>>>>> > Graduate Student,
>>>>> > Department of Computer Science,
>>>>> > Stanford University
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Jagadish V,
>>>>> > Graduate Student,
>>>>> > Department of Computer Science,
>>>>> > Stanford University
>>>>> >
>>>>>
>>>>>
>>>>
>>>>


Mime
View raw message