samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chinmay Soman" <chinmay.cere...@gmail.com>
Subject Re: Review Request 27649: SAMZA-448
Date Thu, 13 Nov 2014 21:40:13 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27649/#review61298
-----------------------------------------------------------

Ship it!


Looks good overall. Added minor comments in the review


samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/27649/#comment102848>

    Do we need to handle cases where messageMap might be empty ? (Not sure if some callers
might do that instead of sending null)



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/27649/#comment102849>

    Maybe take key and value arguments for this constructor as well ?



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/27649/#comment102850>

    Maybe return an unmodifiable map ?



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
<https://reviews.apache.org/r/27649/#comment102857>

    Wait - shouldn't the type be something like "delete-config" ? 
    
    Or am I missing something ?



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/27649/#comment102861>

    Maybe use CoordinatorSystemAdmin just for consistency ? (more of a cosmetic comment -
feel free to ignore)



samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
<https://reviews.apache.org/r/27649/#comment102858>

    Null check for systemStreamMetadataMap ?



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
<https://reviews.apache.org/r/27649/#comment102872>

    Add logging here to see the progress ?



samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
<https://reviews.apache.org/r/27649/#comment102873>

    I guess I'm still confused - what does 'type' in a Delete message mean ? 
    
    From this line it looks like the type is "set-config". Can you please elaborate ?



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
<https://reviews.apache.org/r/27649/#comment102874>

    Brief comment ?



samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
<https://reviews.apache.org/r/27649/#comment102875>

    Spelling mistake : coordinatorSystemConfig


- Chinmay Soman


On Nov. 13, 2014, 5:34 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27649/
> -----------------------------------------------------------
> 
> (Updated Nov. 13, 2014, 5:34 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-448
>     https://issues.apache.org/jira/browse/SAMZA-448
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> remove SystemAdmin.createCoordinatorStream, and create a CoordinatorStreamAdmin instead.
> 
> 
> more feedback from yan
> 
> 
> partial fix of yan's review comments
> 
> 
> add logging info
> 
> 
> make deletes actually work
> 
> 
> add javadocs to mock coordinator stream classes.
> 
> 
> delete old configs when job runner publishes new configs
> 
> 
> add rewriting into job coordinator
> 
> 
> make process job properly set only coordinator stream config
> 
> 
> all tests pass
> 
> 
> fix samza container performance test
> 
> 
> explicitly flush all buffers when closing the kafka producer. fix stateful task test.
> 
> 
> fix kafka tests
> 
> 
> all core tests work
> 
> 
> fix test checkpoint tool by adding a mock coordinator consumer that dumps the entire
config
> 
> 
> working on fixing checkpoint tool tests
> 
> 
> fleshing out the coordinator stream message javadocs
> 
> 
> remove duplicate code from coordinator system factory
> 
> 
> add more javadocs. clean up todos inkafka system admin.
> 
> 
> remove yarn.container.count from yarn config, but use it as a fallback to job.container.count
> 
> 
> add some docs and headers to the coodinator stream and system admin
> 
> 
> refactoring to add coordinator stream system consumer
> 
> 
> cleanup source in job runne
> 
> 
> abstract coordinator system producer creation into a factory
> 
> 
> add todos
> 
> 
> config stream works
> 
> 
> create coordinator stream in system admin
> 
> 
> connecting job coordinator to job runner via coordinator stream
> 
> 
> add util and logging methods
> 
> 
> adding coordinator message and system producer wrapper
> 
> 
> Diffs
> -----
> 
>   build.gradle 828bce9913db00161971607e4c9ac19c63cecb95 
>   samza-api/src/main/java/org/apache/samza/system/CoordinatorSystemAdmin.java PRE-CREATION

>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 571c60631357ea9a0b4fa24e7253008619ef2f32

>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
38e313f3c39454110efd354e6ca025869fa930cd 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ddc30af7c52d8a4d5c5de02f6757c040b1f31c93

>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 3b6685e00837a4aaf809813e62b7e52823bc07a9

>   samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02

>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala c14f2f623bb4bae911dd3085ce428175930e4545

>   samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 16345cd1c1354a0d25a0000d81a307dbe3abbe81

>   samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 6985af6e7cc0d408fa07fbac60141d1126323777

>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 530255e5866bc49ec5ce1a0b7437470cd4e17010

>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 1a67586eeec95dabfeb3b6881af9b3865c3029ca

>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
PRE-CREATION 
>   samza-core/src/test/resources/test.properties 9348c7de956ebf02f58a163dc6fb391a7e29ae64

>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala af800dfeedbfea75abaac3f15fd53bc55b743daf

>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 1eb0edab1bc792ccf8c503b03687284451ab0f34

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 5ac33ea36da451250655d9dd373692b964322b41

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala 4ed5e881031e019d8df6de259cabb658820a3ba0

>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
5ceb1093a66cb57e298d4b3ccdd24845dbb41b58 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java fa1d51b290013a3913d64884dc43907a76670849

>   samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
118f5eee22016db3b802c32fb26c5d72fa61f1a7 
>   samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
d589d762a18f9425aa8d8dd589011a151bcb59a4 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 03395e2efa0fec723e354177d06bfacf7d2a9215

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 91aff3c5e0a2bcea45120d794371fca1c638ccfe

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
b0b6543856cb87888c5a719182ad9576b51bba1a 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 24b11da06a69da734c85720ef39d65ee46d821d5

>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
81dea9d6d1921462b200c62dbdf016c0eb2f01b2 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
cab5101c5c9e2a979bca545fa8046e93dcfe46e2 
> 
> Diff: https://reviews.apache.org/r/27649/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message