samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thunder Stumpges <tstump...@ntent.com>
Subject RE: Old style "low level" Tasks with alternative deployment model(s)
Date Tue, 20 Mar 2018 22:05:01 GMT
I'm out this weekend, but would be glad to collaborate with Tom to get some documentation together. I'll let him take the lead on it, and then maybe I can contribute my information on "running low-level api jobs in standalone mode" and tips on kubernetes (which is really just like any other java application deployment once you get the job running stand-alone)

Thanks everyone, I have my job(s) running successfully in K8s at this time!

-Thunder

-----Original Message-----
From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com] 
Sent: Tuesday, March 20, 2018 11:36
To: Tom Davis <tom@recursivedream.com>
Cc: Prateek Maheshwari <prateekmi2@gmail.com>; dev@samza.apache.org; Yipan@linkedin.com; Yi Pan <nickpan47@gmail.com>
Subject: Re: Old style "low level" Tasks with alternative deployment model(s)

Hi Tom,

>> Happy to put something together this weekend as well.

Great, can't wait!!

>> What format would that be best in?

You can open a PR in markdown format.

Here's an example PR for Kinesis:
https://github.com/apache/samza/pull/384/files/
Here's how it looks and renders in our web-page:
https://samza.apache.org/learn/documentation/0.14/aws/kinesis.html

Best,
Jagdish

On Tue, Mar 20, 2018 at 11:24 AM, Tom Davis <tom@recursivedream.com> wrote:

> What format would that be best in? Happy to put something together 
> this weekend as well.
>
> Jagadish Venkatraman <jagadish1989@gmail.com> writes:
>
> Hi Thunder,
>>
>> Thank you for the PR. Really nice work!
>>
>> Since, you have a working implementation on K8s, would you be willing 
>> to contribute a short tutorial / a post on this? We'll be sure to 
>> feature it in the official Samza web-site at http://samza.apache.org/.
>>
>> It'd be a great addition to the Samza community to have a section on 
>> K8s integration! There have been multiple prior asks on this, and 
>> your learnings would be super-helpful.
>>
>> Best,
>> Jagdish
>>
>>
>>
>> On Tue, Mar 20, 2018 at 10:46 AM, Prateek Maheshwari < 
>> prateekmi2@gmail.com>
>> wrote:
>>
>> Glad you were able to figure it out, that was very confusing. Thanks 
>> for
>>> the fix too.
>>>
>>> - Prateek
>>>
>>> On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges 
>>> <tstumpges@ntent.com>
>>> wrote:
>>>
>>> And that last issue was mine. My setting override was not picked up 
>>> and
>>>> it was using GroupByContainerCount instead.
>>>> -Thanks,
>>>> Thunder
>>>>
>>>>
>>>> -----Original Message-----
>>>> From: Thunder Stumpges
>>>> Sent: Monday, March 19, 2018 20:58
>>>> To: dev@samza.apache.org
>>>> Cc: Jagadish Venkatraman <jagadish1989@gmail.com>; 
>>>> tom@recursivedream.com; Yipan@linkedin.com; Yi Pan 
>>>> <nickpan47@gmail.com>
>>>> Subject: RE: Old style "low level" Tasks with alternative 
>>>> deployment
>>>> model(s)
>>>>
>>>> Well I figured it out. My specific issue was due to a simple 
>>>> dependency problem where I had gotten an older version of the 
>>>> Jackson-mapper library.
>>>> However the code was throwing NoSuchMethodError (an Error instead 
>>>> of
>>>> Exception) and being silently dropped. I created a pull request to 
>>>> handle any Throwable in ScheduleAfterDebounceTime.
>>>> https://github.com/apache/samza/pull/450
>>>>
>>>> I'm now running into an issue with the generation of the JobModel 
>>>> and the ProcessorId. The ZkJobCoordinator has a ProcessorId that is 
>>>> a Guid, but when GroupByContainerIds class (my TaskNameGrouper) 
>>>> creates the ContainerModels, it is using the ContainerId (a numeric 
>>>> value,
>>>> 0,1,2,etc)
>>>> as the ProcessorId (~ line 105). This results in the JobModel that 
>>>> is generated and published immediately causing the processor to 
>>>> quit with this
>>>> message:
>>>>
>>>> INFO  o.apache.samza.zk.ZkJobCoordinator - New JobModel does not 
>>>> contain pid=38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this 
>>>> processor.
>>>>
>>>> I was assuming I should be using GroupByContainerIds as my 
>>>> TaskNameGrouper. I don't see any other promising implementations. 
>>>> Am I just missing something?
>>>>
>>>> Thanks,
>>>> Thunder
>>>>
>>>> JobModel
>>>> {
>>>>   "config" : {
>>>>   ...
>>>>   },
>>>>   "containers" : {
>>>>     "0" : {
>>>>       "tasks" : {
>>>>         "Partition 0" : {
>>>>           "task-name" : "Partition 0",
>>>>           "system-stream-partitions" : [ {
>>>>             "system" : "kafka",
>>>>             "partition" : 0,
>>>>             "stream" : "test_topic1"
>>>>           }, {
>>>>             "system" : "kafka",
>>>>             "partition" : 0,
>>>>             "stream" : "test_topic2"
>>>>           } ],
>>>>           "changelog-partition" : 0
>>>>         },
>>>>         "Partition 1" : {
>>>>           "task-name" : "Partition 1",
>>>>           "system-stream-partitions" : [ {
>>>>             "system" : "kafka",
>>>>             "partition" : 1,
>>>>             "stream" : "test_topic1"
>>>>           }, {
>>>>             "system" : "kafka",
>>>>             "partition" : 1,
>>>>             "stream" : "test_topic2"
>>>>           } ],
>>>>           "changelog-partition" : 1
>>>>         }
>>>>       },
>>>>       "container-id" : 0,
>>>>       "processor-id" : "0"
>>>>     }
>>>>   },
>>>>   "max-change-log-stream-partitions" : 2,
>>>>   "all-container-locality" : {
>>>>     "0" : null
>>>>   }
>>>> }
>>>>
>>>> -----Original Message-----
>>>> From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>>> Sent: Friday, March 16, 2018 18:21
>>>> To: dev@samza.apache.org
>>>> Cc: Jagadish Venkatraman <jagadish1989@gmail.com>; 
>>>> tom@recursivedream.com; Yipan@linkedin.com; Yi Pan 
>>>> <nickpan47@gmail.com>
>>>> Subject: RE: Old style "low level" Tasks with alternative 
>>>> deployment
>>>> model(s)
>>>>
>>>> Attached. I don't see any threads actually running this code which 
>>>> is odd.
>>>>
>>>> There's my main thread that's waiting for the whole thing to 
>>>> finish, the "debounce-thread-0" (which logged the other surrounding 
>>>> messages below) has
>>>> this:
>>>>
>>>> "debounce-thread-0" #18 daemon prio=5 os_prio=0 
>>>> tid=0x00007fa0fd719800
>>>> nid=0x21 waiting on condition [0x00007fa0d0d45000]
>>>>    java.lang.Thread.State: WAITING (parking)
>>>>         at sun.misc.Unsafe.park(Native Method)
>>>>         - parking to wait for  <0x00000006f166e350> (a
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>         at 
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java
>>>> :175)
>>>>         at 
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit
>>>> ionObject.await(AbstractQueuedSynchronizer.java:2039)
>>>>         at 
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork
>>>> Queue.take(ScheduledThreadPoolExecutor.java:1081)
>>>>         at 
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork
>>>> Queue.take(ScheduledThreadPoolExecutor.java:809)
>>>>         at 
>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolEx
>>>> ecutor.java:1067)
>>>>         at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>> Executor.java:1127)
>>>>         at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>> lExecutor.java:617)
>>>>
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>    Locked ownable synchronizers:
>>>>         - None
>>>>
>>>> Thanks for having a look.
>>>> Thunder
>>>>
>>>>
>>>> -----Original Message-----
>>>> From: Prateek Maheshwari [mailto:prateekmi2@gmail.com]
>>>> Sent: Friday, March 16, 2018 17:02
>>>> To: dev@samza.apache.org
>>>> Cc: Jagadish Venkatraman <jagadish1989@gmail.com>; 
>>>> tom@recursivedream.com; Yipan@linkedin.com; Yi Pan 
>>>> <nickpan47@gmail.com>
>>>> Subject: Re: Old style "low level" Tasks with alternative 
>>>> deployment
>>>> model(s)
>>>>
>>>> Hi Thunder,
>>>>
>>>> Can you please take and attach a thread dump with this?
>>>>
>>>> Thanks,
>>>> Prateek
>>>>
>>>> On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges 
>>>> <tstumpges@ntent.com>
>>>> wrote:
>>>>
>>>> > It appears it IS hung while serializing the JobModel... very strange!
>>>> > I added some debug statements around the calls:
>>>> >
>>>> >       LOG.debug("Getting object mapper to serialize job model");  
>>>> > // this IS printed
>>>> >       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>>>> >       LOG.debug("Serializing job model"); // this IS printed
>>>> >       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
>>>> > ().writeValueAsString(jobModel);
>>>> >       LOG.info("jobModelAsString=" + jobModelStr); // this is NOT
>>>> printed!
>>>> >
>>>> > Another thing I noticed is that "getObjectMapper" actually 
>>>> > creates the object mapper twice!
>>>> >
>>>> > 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG 
>>>> > org.apache.samza.zk.ZkUtils - Getting object mapper to serialize 
>>>> > job model
>>>> > 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG 
>>>> > o.a.s.s.model.SamzaObjectMapper
>>>> > - Creating new object mapper and simple module
>>>> > 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG 
>>>> > o.a.s.s.model.SamzaObjectMapper
>>>> > - Adding SerDes and mixins
>>>> > 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG 
>>>> > o.a.s.s.model.SamzaObjectMapper
>>>> > - Adding custom ContainerModel deserializer
>>>> > 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG 
>>>> > o.a.s.s.model.SamzaObjectMapper
>>>> > - Setting up naming strategy and registering module
>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>>>> > o.a.s.s.model.SamzaObjectMapper
>>>> > - Done!
>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>>>> > o.a.s.s.model.SamzaObjectMapper
>>>> > - Creating new object mapper and simple module
>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>>>> > o.a.s.s.model.SamzaObjectMapper
>>>> > - Adding SerDes  and mixins
>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>>>> > o.a.s.s.model.SamzaObjectMapper
>>>> > - Adding custom ContainerModel deserializer
>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>>>> > o.a.s.s.model.SamzaObjectMapper
>>>> > - Setting up naming strategy and registering module
>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>>>> > o.a.s.s.model.SamzaObjectMapper
>>>> > - Done!
>>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG 
>>>> > org.apache.samza.zk.ZkUtils - Serializing job model
>>>> >
>>>> > Could this ObjectMapper be a singleton? I see there is a private 
>>>> > static instance, but getObjectMapper creates a new one every time...
>>>> >
>>>> > Anyway, then it takes off to serialize the job model and never 
>>>> > comes back...
>>>> >
>>>> > Hoping someone has some idea here... the implementation for this 
>>>> > mostly comes from Jackson-mapper-asl, and I have the version that 
>>>> > is linked in the
>>>> > 0.14.0 tag:
>>>> > |    |    |    +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
>>>> > |    |    |    |    \--- org.codehaus.jackson:jackson-core-asl:1.9.13
>>>> >
>>>> > Thanks!
>>>> > Thunder
>>>> >
>>>> > -----Original Message-----
>>>> > From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>>> > Sent: Friday, March 16, 2018 15:29
>>>> > To: dev@samza.apache.org; Jagadish Venkatraman 
>>>> > <jagadish1989@gmail.com>
>>>> > Cc: tom@recursivedream.com; Yipan@linkedin.com; Yi Pan < 
>>>> > nickpan47@gmail.com>
>>>> > Subject: RE: Old style "low level" Tasks with alternative 
>>>> > deployment
>>>> > model(s)
>>>> >
>>>> > So, my investigation starts at StreamProcessor.java.  Line 294 in 
>>>> > method
>>>> > onNewJobModel() logs an INFO message that it's starting a container.
>>>> > This message never appears.
>>>> >
>>>> > I see that ZkJobCoordinator calls onNewJobModel from its 
>>>> > onNewJobModelConfirmed method which also logs an info message 
>>>> > stating "version X of the job model got confirmed". I never see 
>>>> > this message either, so I go up the chain some more.
>>>> >
>>>> > I DO see:
>>>> >
>>>> > 2018-03-16 21:43:58 logback 20498 
>>>> > [ZkClient-EventThread-13-10.0.127.114:2181]
>>>> > INFO  o.apache.samza.zk.ZkJobCoordinator - 
>>>> > ZkJobCoordinator::onBecomeLeader
>>>> > - I became the leader!
>>>> > And
>>>> > 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO 
>>>> > o.apache.samza.zk.ZkJobCoordinator - 
>>>> > pid=91e07d20-ae33-4156-a5f3-534a95642133Generated
>>>> > new Job Model. Version = 1
>>>> >
>>>> > Which led me to method onDoProcessorChange line 210. I see that 
>>>> > line, but not the line below " Published new Job Model. Version 
>>>> > =" so something in here is not completing:
>>>> >
>>>> >     LOG.info("pid=" + processorId + "Generated new Job Model. 
>>>> > Version
>>>> =
>>>> "
>>>> > + nextJMVersion);
>>>> >
>>>> >     // Publish the new job model
>>>> >     zkUtils.publishJobModel(nextJMVersion, jobModel);
>>>> >
>>>> >     // Start the barrier for the job model update
>>>> >     barrier.create(nextJMVersion, currentProcessorIds);
>>>> >
>>>> >     // Notify all processors about the new JobModel by updating 
>>>> > JobModel Version number
>>>> >     zkUtils.publishJobModelVersion(currentJMVersion, 
>>>> > nextJMVersion);
>>>> >
>>>> >     LOG.info("pid=" + processorId + "Published new Job Model. 
>>>> > Version
>>>> =
>>>> "
>>>> > + nextJMVersion);
>>>> >
>>>> > As I mentioned, after the line "Generated new Job Model. Version = 1"
>>>> > I just get repeated zk ping responses.. no more application logging.
>>>> >
>>>> > The very next thing that's run is zkUtils.publishJobModel() which 
>>>> > only has two lines before another log statement (which I don't see):
>>>> >
>>>> >   public void publishJobModel(String jobModelVersion, JobModel
>>>> jobModel) {
>>>> >     try {
>>>> >       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
>>>> >       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
>>>> > ().writeValueAsString(jobModel);
>>>> >       LOG.info("jobModelAsString=" + jobModelStr);
>>>> >       ...
>>>> >
>>>> > Could it really be getting hung up on one of these two lines? 
>>>> > (seems like it must be, but I don't see anything there that seems 
>>>> > like it would just hang). I'll keep troubleshooting, maybe add 
>>>> > some more debug logging and try again.
>>>> >
>>>> > Thanks for any guidance you all might have.
>>>> > -Thunder
>>>> >
>>>> >
>>>> > -----Original Message-----
>>>> > From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>>> > Sent: Friday, March 16, 2018 14:43
>>>> > To: dev@samza.apache.org; Jagadish Venkatraman 
>>>> > <jagadish1989@gmail.com>
>>>> > Cc: tom@recursivedream.com; Yipan@linkedin.com; Yi Pan < 
>>>> > nickpan47@gmail.com>
>>>> > Subject: RE: Old style "low level" Tasks with alternative 
>>>> > deployment
>>>> > model(s)
>>>> >
>>>> > Well I have my stand-alone application in docker and running in 
>>>> > kubernetes. I think something isn't wired up all the way though, 
>>>> > because my task never actually gets invoked. I see no errors, 
>>>> > however I'm not getting the usual startup logs (checking existing 
>>>> > offsets, "entering run loop"...) My logs look like this:
>>>> >
>>>> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO 
>>>> > kafka.utils.VerifiableProperties
>>>> > - Verifying properties
>>>> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO 
>>>> > kafka.utils.VerifiableProperties
>>>> > - Property client.id is overridden to 
>>>> > samza_admin-test_stream_task-1
>>>> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO 
>>>> > kafka.utils.VerifiableProperties
>>>> > - Property metadata.broker.list is overridden to
>>>> > test-kafka-kafka.test-svc:9092
>>>> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO 
>>>> > kafka.utils.VerifiableProperties
>>>> > - Property request.timeout.ms is overridden to 30000
>>>> > 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO 
>>>> > kafka.client.ClientUtils$ - Fetching metadata from broker
>>>> > BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation 
>>>> > id
>>>> 0
>>>> > for 1 topic(s) Set(dev_k8s.samza.test.topic)
>>>> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG 
>>>> > kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = 
>>>> > 30000 (requested 30000), SO_RCVBUF = 179680 (requested -1), 
>>>> > SO_SNDBUF =
>>>> > 102400 (requested 102400), connectTimeoutMs = 30000.
>>>> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO 
>>>> > kafka.producer.SyncProducer - Connected to
>>>> > test-kafka-kafka.test-svc:9092 for producing
>>>> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO 
>>>> > kafka.producer.SyncProducer - Disconnecting from
>>>> > test-kafka-kafka.test-svc:9092
>>>> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG 
>>>> > kafka.client.ClientUtils$ - Successfully fetched metadata for 1
>>>> > topic(s)
>>>> > Set(dev_k8s.samza.test.topic)
>>>> > 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper 
>>>> > org.apache.samza.container.grouper.stream.GroupByPartition@1a7158
>>>> > cc has grouped the SystemStreamPartitions into 10 tasks with the 
>>>> > following
>>>> > taskNames: [Partition 1, Partition 0, Partition 3, Partition 2, 
>>>> > Partition 5, Partition 4, Partition 7, Partition 6, Partition 9, 
>>>> > Partition 8]
>>>> > 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 0 is 
>>>> > being assigned changelog partition 0.
>>>> > 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 1 is 
>>>> > being assigned changelog partition 1.
>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 2 is 
>>>> > being assigned changelog partition 2.
>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 3 is 
>>>> > being assigned changelog partition 3.
>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 4 is 
>>>> > being assigned changelog partition 4.
>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 5 is 
>>>> > being assigned changelog partition 5.
>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 6 is 
>>>> > being assigned changelog partition 6.
>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 7 is 
>>>> > being assigned changelog partition 7.
>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 8 is 
>>>> > being assigned changelog partition 8.
>>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO 
>>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 9 is 
>>>> > being assigned changelog partition 9.
>>>> > 2018-03-16 21:05:55 logback 50838 
>>>> > [main-SendThread(10.0.127.114:2181
>>>> )]
>>>> > DEBUG org.apache.zookeeper.ClientCnxn - Reading reply 
>>>> > sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null 
>>>> > serverPath:null finished:false header:: 23,4  replyHeader:: 23,14024,0  request::
>>>> > '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/
>>>> > JobModelGeneration/jobModelVersion,T  response::
>>>> > ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878}
>>>> > 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO 
>>>> > o.apache.samza.zk.ZkJobCoordinator - 
>>>> > pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated
>>>> > new Job Model. Version = 1
>>>> > 2018-03-16 21:06:05 logback 60848 
>>>> > [main-SendThread(10.0.127.114:2181
>>>> )]
>>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>>>> sessionid:
>>>> > 0x1622c8b5fc01ac7 after 2ms
>>>> > 2018-03-16 21:06:15 logback 70856 
>>>> > [main-SendThread(10.0.127.114:2181
>>>> )]
>>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>>>> sessionid:
>>>> > 0x1622c8b5fc01ac7 after 1ms
>>>> > 2018-03-16 21:06:25 logback 80865 
>>>> > [main-SendThread(10.0.127.114:2181
>>>> )]
>>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for
>>>> sessionid:
>>>> > 0x1622c8b5fc01ac7 after 2ms ...
>>>> >
>>>> > The zk ping responses continue every 10 seconds, but no other 
>>>> > activity or messages occur.
>>>> > It looks like it gets as far as confirming the JobModel and 
>>>> > grouping the partitions, but nothing actually starts up.
>>>> >
>>>> > Any ideas?
>>>> > Thanks in advance!
>>>> > Thunder
>>>> >
>>>> >
>>>> > -----Original Message-----
>>>> > From: Thunder Stumpges [mailto:tstumpges@ntent.com]
>>>> > Sent: Thursday, March 15, 2018 16:35
>>>> > To: Jagadish Venkatraman <jagadish1989@gmail.com>
>>>> > Cc: dev@samza.apache.org; tom@recursivedream.com; 
>>>> > Yipan@linkedin.com; Yi Pan <nickpan47@gmail.com>
>>>> > Subject: RE: Old style "low level" Tasks with alternative 
>>>> > deployment
>>>> > model(s)
>>>> >
>>>> > Thanks a lot for the info. I have something basically working at 
>>>> > this point! I have not integrated it with Docker nor Kubernetes 
>>>> > yet, but it does run from my local machine.
>>>> >
>>>> > I have determined that LocalApplicationRunner does NOT do config 
>>>> > rewriting. I had to write my own little “StandAloneApplicationRunner”
>>>> > that handles the “main” entrypoint. It does command parsing using 
>>>> > CommandLine, load config from ConfigFactory, and perform 
>>>> > rewriting before creating the new instance of 
>>>> > LocalApplicationRunner. This is all my StandAloneApplicationRunner contains:
>>>> >
>>>> >
>>>> > object StandAloneSamzaRunner extends App with LazyLogging {
>>>> >
>>>> >   // parse command line args just like JobRunner.
>>>> >   val cmdline = new ApplicationRunnerCommandLine
>>>> >   val options = cmdline.parser.parse(args: _*)
>>>> >   val config = cmdline.loadConfig(options)
>>>> >
>>>> >   val runner = new LocalApplicationRunner(Util.rewriteConfig(config))
>>>> >   runner.runTask()
>>>> >   runner.waitForFinish()
>>>> > }
>>>> >
>>>> > The only config settings I needed to make to use this runner were 
>>>> > (easily configured due to our central Consul config system and 
>>>> > our
>>>> rewriter) :
>>>> >
>>>> > # use the ZK based job coordinator 
>>>> > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFacto
>>>> > ry # need to use GroupByContainerIds instead of 
>>>> > GroupByContainerCount 
>>>> > task.name.grouper.factory=org.apache.samza.container.grouper.task.
>>>> > GroupByContainerIdsFactory
>>>> > # ZKJC config
>>>> > job.coordinator.zk.connect=<our_zk_connection>
>>>> >
>>>> > I did run into one potential problem; as you see above, I have 
>>>> > started the task using runTask() and then to prevent my main 
>>>> > method from returning, I have called waitForFinish(). The first 
>>>> > time I ran it, the job itself failed because I had forgotten to 
>>>> > override the task grouper, and container count was pulled from our staging environment.
>>>> > There are some failures logged and it appears the JobCoordinator 
>>>> > fails, but it never returns from waitForFinish. Stack trace and
>>>> continuation of log is below:
>>>> >
>>>> > 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR 
>>>> > o.a.s.zk.ScheduleAfterDebounceTime
>>>> > - Execution of action: OnProcessorChange failed.
>>>> > java.lang.IllegalArgumentException: Your container count (4) is
>>>> larger
>>>> > than your task count (2). Can't have containers with nothing to 
>>>> > do, so aborting.
>>>> >        at org.apache.samza.container.gro
>>>> uper.task.GroupByContainerCoun
>>>> t.
>>>> > validateTasks(GroupByContainerCount.java:212)
>>>> >        at org.apache.samza.container.grouper.task.
>>>> > GroupByContainerCount.group(GroupByContainerCount.java:62)
>>>> >        at org.apache.samza.container.grouper.task.TaskNameGrouper.
>>>> > group(TaskNameGrouper.java:56)
>>>> >        at 
>>>> > org.apache.samza.coordinator.JobModelManager$.readJobModel(
>>>> > JobModelManager.scala:266)
>>>> >        at 
>>>> > org.apache.samza.coordinator.JobModelManager.readJobModel(
>>>> > JobModelManager.scala)
>>>> >        at 
>>>> > org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel(
>>>> > ZkJobCoordinator.java:306)
>>>> >        at 
>>>> > org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(
>>>> > ZkJobCoordinator.java:197)
>>>> >        at org.apache.samza.zk.ZkJobCoord
>>>> inator$LeaderElectorListenerIm
>>>> pl.
>>>> > lambda$onBecomingLeader$0(ZkJobCoordinator.java:318)
>>>> >        at org.apache.samza.zk.ScheduleAfterDebounceTime.
>>>> > lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134)
>>>> >        at java.util.concurrent.Executors$RunnableAdapter.
>>>> > call$$$capture(Executors.java:511)
>>>> >        at java.util.concurrent.Executors$RunnableAdapter.
>>>> > call(Executors.java)
>>>> >        at java.util.concurrent.FutureTask.run$$$capture(
>>>> > FutureTask.java:266)
>>>> >        at java.util.concurrent.FutureTask.run(FutureTask.java)
>>>> >        at java.util.concurrent.ScheduledThreadPoolExecutor$
>>>> > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>> >        at java.util.concurrent.ScheduledThreadPoolExecutor$
>>>> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> >        at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>> > ThreadPoolExecutor.java:1142)
>>>> >        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>> > ThreadPoolExecutor.java:617)
>>>> >        at java.lang.Thread.run(Thread.java:745)
>>>> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG 
>>>> > o.a.samza.processor.StreamProcessor - Container is not 
>>>> > instantiated
>>>> yet.
>>>> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG 
>>>> > org.I0Itec.zkclient.ZkClient - Closing ZkClient...
>>>> > 2018-03-15 22:34:32 logback 77789 
>>>> > [ZkClient-EventThread-15-10.0.127.114:2181]
>>>> > INFO  org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient 
>>>> > event
>>>> thread.
>>>> >
>>>> > And then the application continues on with metric reporters, and 
>>>> > other debug logging (not actually running the task though)
>>>> >
>>>> > Thanks in advance for the guidance, this has been easier than I
>>>> imagined!
>>>> > I’ll report back when I get more of the Dockerization/Kubernetes 
>>>> > running and test it a bit more.
>>>> > Cheers,
>>>> > Thunder
>>>> >
>>>> >
>>>> > From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com]
>>>> > Sent: Thursday, March 15, 2018 14:46
>>>> > To: Thunder Stumpges <tstumpges@ntent.com>
>>>> > Cc: dev@samza.apache.org; tom@recursivedream.com; 
>>>> > Yipan@linkedin.com; Yi Pan <nickpan47@gmail.com>
>>>> > Subject: Re: Old style "low level" Tasks with alternative 
>>>> > deployment
>>>> > model(s)
>>>> >
>>>> > >>  Thanks for the info on the tradeoffs. That makes a lot of 
>>>> > >> sense. I am
>>>> > on-board with using ZkJobCoordinator, sounds like some good 
>>>> > benefits over just the Kafka high-level consumer.
>>>> >
>>>> > This certainly looks like the simplest alternative.
>>>> >
>>>> > For your other questions, please find my answers inline.
>>>> >
>>>> > >> Q1: If I use LocalApplicationRunner, It does not use
>>>> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>>>> >
>>>> > Your understanding is correct. It directly instantiates the 
>>>> > StreamProcessor, which in-turn creates and runs the SamzaContainer.
>>>> >
>>>> > >> Q2: If I use LocalApplicationRunner, I will need to code 
>>>> > >> myself the
>>>> > loading and rewriting of the Config that is currently handled by 
>>>> > JobRunner, correct?
>>>> >
>>>> > I don't think you'll need to do this. IIUC, the 
>>>> > LocalApplicationRunner should automatically invoke rewriters and do the right thing.
>>>> >
>>>> > >>  Q3: Do I need to also handle coordinator stream(s) and 
>>>> > >> storing of
>>>> > config that is done in JobRunner (I don’t think so as the ?
>>>> >
>>>> > I don't think this is necessary either. The creation of 
>>>> > coordinator stream and persisting configuration happens in the 
>>>> > LocalApplicationRunner (more specifically in
>>>> StreamManager#createStreams).
>>>> >
>>>> > >> Q4: Where/How do I specify the Container ID for each instance? 
>>>> > >> Is there
>>>> > a config setting that I can pass, (or pull from an env variable 
>>>> > and add to the config) ? I am assuming it is my responsibility to 
>>>> > ensure that each instance is started with a unique container ID..?
>>>> >
>>>> > Nope, If you are using the ZkJobCoordinator, you need not have to 
>>>> > worry about assigning IDs for each instance. The framework will 
>>>> > automatically take care of generating IDs and reaching consensus 
>>>> > by electing a leader. If you are curious please take a look at 
>>>> > implementations of the ProcessorIdGenerator interface.
>>>> >
>>>> > Please let us know should you have further questions!
>>>> >
>>>> > Best,
>>>> > Jagdish
>>>> >
>>>> > On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges 
>>>> > <tstumpges@ntent.com <mailto:tstumpges@ntent.com>> wrote:
>>>> >
>>>> > Thanks for the info on the tradeoffs. That makes a lot of sense. 
>>>> > I am on-board with using ZkJobCoordinator, sounds like some good 
>>>> > benefits over just the Kafka high-level consumer.
>>>> >
>>>> >
>>>> >
>>>> > To that end, I have made some notes on possible approaches based 
>>>> > on the previous thread, and from my look into the code. I’d love 
>>>> > to get
>>>> feedback.
>>>> >
>>>> >
>>>> >
>>>> > Approach 1. Configure jobs to use “ProcessJobFactory” and run 
>>>> > instances of the job using run-job.sh or using JobRunner directly.
>>>> >
>>>> > I don’t think this makes sense from what I can see for a few reasons:
>>>> >
>>>> >   *   JobRunner is concerned with stuff I don't *think* we need:
>>>> >
>>>> >      *   coordinatorSystemProducer|Consumer,
>>>> >      *   writing/reading the configuration to the coordinator streams
>>>> >
>>>> >   *   ProcessJobFactory hard-codes the ID to “0” so I don’t think that
>>>> > will work for multiple instances.
>>>> >
>>>> >
>>>> >
>>>> > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and 
>>>> > invoke
>>>> > LocalApplicationRunner.runTask()
>>>> >
>>>> >
>>>> >
>>>> >     Q1: If I use LocalApplicationRunner, It does not use 
>>>> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>>>> >
>>>> >     Q2: If I use LocalApplicationRunner, I will need to code 
>>>> > myself the loading and rewriting of the Config that is currently 
>>>> > handled by JobRunner, correct?
>>>> >
>>>> >     Q3: Do I need to also handle coordinator stream(s) and 
>>>> > storing of config that is done in JobRunner (I don’t think so as the ?
>>>> >
>>>> >     Q4: Where/How do I specify the Container ID for each 
>>>> > instance? Is there a config setting that I can pass, (or pull 
>>>> > from an env variable and add to the config) ? I am assuming it is 
>>>> > my responsibility to ensure that each instance is started with a unique container ID..?
>>>> >
>>>> > I am getting started on the above (Approach 2.), and looking 
>>>> > closer at the code so I may have my own answers to my questions, 
>>>> > but figured I should go ahead and ask now anyway. Thanks!
>>>> >
>>>> > -Thunder
>>>> >
>>>> >
>>>> > From: Jagadish Venkatraman [mailto:jagadish1989@gmail.com<mailto:
>>>> > jagadish1989@gmail.com>]
>>>> > Sent: Thursday, March 15, 2018 1:41
>>>> > To: dev@samza.apache.org<mailto:dev@samza.apache.org>; Thunder 
>>>> > Stumpges < tstumpges@ntent.com<mailto:tstumpges@ntent.com>>;
>>>> > tom@recursivedream.com <mailto:tom@recursivedream.com>
>>>> > Cc: Yipan@linkedin.com<mailto:Yipan@linkedin.com>; Yi Pan < 
>>>> > nickpan47@gmail.com<mailto:nickpan47@gmail.com>>
>>>> >
>>>> > Subject: Re: Old style "low level" Tasks with alternative 
>>>> > deployment
>>>> > model(s)
>>>> >
>>>> > >> You are correct that this is focused on the higher-level API 
>>>> > >> but doesn't
>>>> > preclude using the lower-level API. I was at the same point you 
>>>> > were not long ago, in fact, and had a very productive 
>>>> > conversation on the list
>>>> >
>>>> > Thanks Tom for linking the thread, and I'm glad that you were 
>>>> > able to get Kubernetes integration working with Samza.
>>>> >
>>>> > >> If it is helpful for everyone, once I get the low-level API + 
>>>> > >> ZkJobCoordinator + Docker +
>>>> > K8s working, I'd be glad to formulate an additional sample for
>>>> hello-samza.
>>>> >
>>>> > @Thunder Stumpges:
>>>> > We'd be thrilled to receive your contribution. Examples, demos, 
>>>> > tutorials etc.
>>>> > contribute a great deal to improving the ease of use of Apache Samza.
>>>> > I'm happy to shepherd design discussions/code-reviews in the 
>>>> > open-source including answering any questions you may have.
>>>> >
>>>> >
>>>> > >> One thing I'm still curious about, is what are the drawbacks 
>>>> > >> or complexities of leveraging the Kafka High-level consumer + 
>>>> > >> PassthroughJobCoordinator in a stand-alone setup like this? We 
>>>> > >> do have Zookeeper (because of kafka) so I think either would 
>>>> > >> work. The Kafka High-level consumer comes with other nice 
>>>> > >> tools for monitoring offsets, lag, etc
>>>> >
>>>> >
>>>> > @Thunder Stumpges:
>>>> >
>>>> > Samza uses a "Job-Coordinator" to assign your input-partitions 
>>>> > among the different instances of your application s.t. they don't 
>>>> > overlap. A typical way to solve this "partition distribution"
>>>> > problem is to have a single instance elected as a "leader" and 
>>>> > have the leader assign partitions to the group.
>>>> > The ZkJobCoordinator uses Zk primitives to achieve this, while 
>>>> > the YarnJC relies on Yarn's guarantee that there will be a 
>>>> > singleton-AppMaster to achieve this.
>>>> >
>>>> > A key difference that separates the PassthroughJC from the 
>>>> > Yarn/Zk variants is that it does _not_ attempt to solve the 
>>>> > "partition distribution" problem. As a result, there's no 
>>>> > leader-election
>>>> involved.
>>>> > Instead, it pushes the problem of "partition distribution" to the 
>>>> > underlying consumer.
>>>> >
>>>> > The PassThroughJc supports these 2 scenarios:
>>>> >
>>>> > 1. Consumer-managed partition distribution: When using the Kafka 
>>>> > high-level consumer (or an AWS KinesisClientLibrary consumer) 
>>>> > with Samza, the consumer manages partitions internally.
>>>> >
>>>> > 2. Static partition distribution: Alternately, partitions can be 
>>>> > managed statically using configuration. You can achieve static 
>>>> > partition assignment by implementing a custom 
>>>> > SystemStreamPartitionGrouper<h 
>>>> > ttps://samza.apache.org/learn/documentation/0.8/api/
>>>> > javadocs/org/apache/samza/container/grouper/stream/
>>>> > SystemStreamPartitionGrouper.html> and TaskNameGrouper<https:// 
>>>> > github.com/apache/samza/blob/master/samza-core/src/main/
>>>> > java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>.
>>>> > Solutions in this category will typically require you to 
>>>> > distinguish the various processors in the group by providing an "id" for each.
>>>> > Once the "id"s are decided, you can then statically compute 
>>>> > assignments using a function (eg: modulo N).
>>>> > You can rely on the following mechanisms to provide this id:
>>>> >  - Configure each instance differently to have its own id
>>>> >  - Obtain the id from the cluster-manager. For instance, 
>>>> > Kubernetes will provide each POD an unique id in the range [0,N). 
>>>> > AWS ECS should expose similar capabilities via a REST end-point.
>>>> >
>>>> > >> One thing I'm still curious about, is what are the drawbacks 
>>>> > >> or
>>>> > complexities of leveraging the Kafka High-level consumer + 
>>>> > PassthroughJobCoordinator in a stand-alone setup like this?
>>>> >
>>>> > Leveraging the Kafka High-level consumer:
>>>> >
>>>> > The Kafka high-level consumer is not integrated into Samza just yet.
>>>> > Instead, Samza's integration with Kafka uses the low-level 
>>>> > consumer because
>>>> > i) It allows for greater control in fetching data from individual
>>>> brokers.
>>>> > It is simple and performant in-terms of the threading model to 
>>>> > have one-thread pull from each broker.
>>>> > ii) It is efficient in memory utilization since it does not do 
>>>> > internal-buffering of messages.
>>>> > iii) There's no overhead like Kafka-controller heart-beats that 
>>>> > are driven by consumer.poll
>>>> >
>>>> > Since there's no built-in integration, you will have to build a 
>>>> > new SystemConsumer if you need to integrate with the Kafka 
>>>> > High-level
>>>> consumer.
>>>> > Further, there's more a fair bit of complexity to manage in
>>>> checkpointing.
>>>> >
>>>> > >> The Kafka High-level consumer comes with other nice tools for 
>>>> > >> monitoring offsets, lag, etc
>>>> >
>>>> > Samza exposes<https://github.com/apache/samza/blob/master/
>>>> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/
>>>> > KafkaSystemConsumerMetrics.scala> the below metrics for
>>>> lag-monitoring:
>>>> > - The current log-end offset for each partition
>>>> > - The last check-pointed offset for each partition
>>>> > - The number of messages behind the highwatermark of the 
>>>> > partition
>>>> >
>>>> > Please let us know if you need help discovering these or 
>>>> > integrating these with other systems/tools.
>>>> >
>>>> >
>>>> > Leveraging the Passthrough JobCoordinator:
>>>> >
>>>> > It's helpful to split this discussion on tradeoffs with 
>>>> > PassthroughJC into
>>>> > 2 parts:
>>>> >
>>>> > 1. PassthroughJC + consumer managed partitions:
>>>> >
>>>> > - In this model, Samza has no control over partition-assignment 
>>>> > since it's managed by the consumer. This means that stateful 
>>>> > operations like joins that rely on partitions being co-located on 
>>>> > the same task will
>>>> not work.
>>>> > Simple stateless operations (eg: map, filter, remote lookups) are
>>>> fine.
>>>> >
>>>> > - A key differentiator between Samza and other frameworks is our 
>>>> > support for "host 
>>>> > affinity<https://samza.apache.org/learn/documentation/0.14/
>>>> > yarn/yarn-host-affinity.html>". Samza achieves this by assigning 
>>>> > partitions to hosts taking data-locality into account. If the 
>>>> > consumer can arbitrarily shuffle partitions, it'd be hard to 
>>>> > support this affinity/locality. Often this is a key optimization 
>>>> > when dealing with large stateful jobs.
>>>> >
>>>> > 2. PassthroughJC + static partitions:
>>>> >
>>>> > - In this model, it is possible to make stateful processing 
>>>> > (including host affinity) work by carefully choosing how "id"s 
>>>> > are assigned and computed.
>>>> >
>>>> > Recommendation:
>>>> >
>>>> > - Owing to the above subtleties, I would recommend that we give 
>>>> > the ZkJobCoordinator + the built-in low-level Kafka integration a try.
>>>> > - If we hit snags down this path, we can certainly explore the 
>>>> > approach with PassthroughJC + static partitions.
>>>> > - Using the PassthroughJC + consumer-managed distribution would 
>>>> > be least preferable owing to the subtleties I outlined above.
>>>> >
>>>> > Please let us know should you have more questions.
>>>> >
>>>> > Best,
>>>> > Jagdish
>>>> >
>>>> > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges <
>>>> tstumpges@ntent.com
>>>> > <mailto:tstumpges@ntent.com>> wrote:
>>>> > Wow, what great timing, and what a great thread! I definitely 
>>>> > have some good starters to go off of here.
>>>> >
>>>> > If it is helpful for everyone, once I get the low-level API + 
>>>> > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate 
>>>> > an additional sample for hello-samza.
>>>> >
>>>> > One thing I'm still curious about, is what are the drawbacks or 
>>>> > complexities of leveraging the Kafka High-level consumer + 
>>>> > PassthroughJobCoordinator in a stand-alone setup like this? We do 
>>>> > have Zookeeper (because of kafka) so I think either would work. 
>>>> > The Kafka High-level consumer comes with other nice tools for 
>>>> > monitoring offsets, lag, etc....
>>>> >
>>>> > Thanks guys!
>>>> > -Thunder
>>>> >
>>>> > -----Original Message-----
>>>> > From: Tom Davis [mailto:tom@recursivedream.com<mailto:
>>>> > tom@recursivedream.com>]
>>>> > Sent: Wednesday, March 14, 2018 17:50
>>>> > To: dev@samza.apache.org<mailto:dev@samza.apache.org>
>>>> > Subject: Re: Old style "low level" Tasks with alternative 
>>>> > deployment
>>>> > model(s)
>>>> >
>>>> > Hey there!
>>>> >
>>>> > You are correct that this is focused on the higher-level API but 
>>>> > doesn't preclude using the lower-level API. I was at the same 
>>>> > point you were not long ago, in fact, and had a very productive 
>>>> > conversation
>>>> on the list:
>>>> > you should look for "Question about custom StreamJob/Factory" in 
>>>> > the list archive for the past couple months.
>>>> >
>>>> > I'll quote Jagadish Venkatraman from that thread:
>>>> >
>>>> > > For the section on the low-level API, can you use 
>>>> > > LocalApplicationRunner#runTask()? It basically creates a new 
>>>> > > StreamProcessor and runs it. Remember to provide task.class and 
>>>> > > set it to your implementation of StreamTask or AsyncStreamTask. 
>>>> > > Please note that this is an evolving API and hence, subject to change.
>>>> >
>>>> > I ended up just switching to the high-level API because I don't 
>>>> > have any existing Tasks and the Kubernetes story is a little more 
>>>> > straight forward there (there's only one container/configuration to deploy).
>>>> >
>>>> > Best,
>>>> >
>>>> > Tom
>>>> >
>>>> > Thunder Stumpges 
>>>> > <tstumpges@ntent.com<mailto:tstumpges@ntent.com>>
>>>> writes:
>>>> >
>>>> > > Hi all,
>>>> > >
>>>> > > We are using Samza (0.12.0) in about 2 dozen jobs implementing 
>>>> > > several processing pipelines. We have also begun a significant 
>>>> > > move of other services within our company to Docker/Kubernetes. 
>>>> > > Right now our Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce"
>>>> > > jobs
>>>> > (many reporting and other batch processing jobs). We would really 
>>>> > like to move our stream processing off of Hadoop/Yarn and onto Kubernetes.
>>>> > >
>>>> > > When I just read about some of the new progress in .13 and .14 
>>>> > > I got really excited! We would love to have our jobs run as 
>>>> > > simple libraries in our own JVM, and use the Kafka 
>>>> > > High-Level-Consumer for partition
>>>> > distribution and such. This would let us "dockerfy" our 
>>>> > application and run/scale in kubernetes.
>>>> > >
>>>> > > However as I read it, this new deployment model is ONLY for the
>>>> > > new(er) High Level API, correct? Is there a plan and/or 
>>>> > > resources for adapting this back to existing low-level tasks ? 
>>>> > > How complicated of a
>>>> > task is that? Do I have any other options to make this transition
>>>> easier?
>>>> > >
>>>> > > Thanks in advance.
>>>> > > Thunder
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > Jagadish V,
>>>> > Graduate Student,
>>>> > Department of Computer Science,
>>>> > Stanford University
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > Jagadish V,
>>>> > Graduate Student,
>>>> > Department of Computer Science,
>>>> > Stanford University
>>>> >
>>>>
>>>>
>>>
>>>


--
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University
Mime
View raw message